KafkaOffsetMonitor 설치

INSANEZINDOL·2021년 11월 25일
post-thumbnail

Kafka Offset Monitor 업데이트가 되지 않고 있어서 Kafka 2.1 버전 이상에서는 위와 같은 에러가 발생한다.

💡 The message was malformed and does not conform to a type of (BaseKey, OffsetAndMetadata. Ignoring this message. [kafka.common.KafkaException](http://kafka.common.kafkaexception/): Unknown offset schema version 3
  1. KafkaOffsetMonitor 클론
git clone [https://github.com/quantifind/KafkaOffsetMonitor.git](https://github.com/quantifind/KafkaOffsetMonitor.git)
  1. KafkaOffsetGetter 수정
cd KafkaOffsetMonitor/src/main/scala/com/quantifind/kafka/core
mv KafkaOffsetGetter.scala KafkaOffsetGetter_backup
vi KafkaOffsetGetter.scala

아래 코드(1)로 작성

  1. Build 수정
cd KafkaOffsetMonitor/project
mv Build.scala Build.backup
vi Build.scala

아래 코드(2)로 작성

  1. build
cd KafkaOffsetMonitor
sbt assembly
  1. sbt 에러나면 아래 코드 실행
echo "deb [https://dl.bintray.com/sbt/debian](https://dl.bintray.com/sbt/debian) /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
curl -sL "[https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823](https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823)" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt
  1. 실행
java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk 192.168.0.134:2181 --port 8081 --refresh 10.seconds --retain 2.days
  1. 브라우저 접속 확인
💡 [http://localhost:8081](http://ubuntu-server:8081/)

코드(1)

package com.quantifind.kafka.core

import java.nio.ByteBuffer

import com.quantifind.kafka.OffsetGetter
import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.{ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.utils.Logging
import org.I0Itec.zkclient.ZkClient

import scala.collection._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal

class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {

  import KafkaOffsetGetter._

  override val zkClient = theZkClient

  override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
    try {
      zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
        case Some(bid) =>
          val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
          consumerOpt flatMap { consumer =>
            val topicAndPartition = TopicAndPartition(topic, pid)
            offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
              val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
              OffsetInfo(group = group,
                topic = topic,
                partition = pid,
                offset = offsetMetaData.offset,
                logSize = logSize,
                owner = Some("NA"),
                creation = Time.fromMilliseconds(offsetMetaData.timestamp),
                modified = Time.fromMilliseconds(offsetMetaData.timestamp))
            }
          }
        case None =>
          error("No broker for partition %s - %s".format(topic, pid))
          None
      }
    } catch {
      case NonFatal(t) =>
        error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
        None
    }
  }

  override def getGroups: Seq[String] = {
    topicAndGroups.groupBy(_.group).keySet.toSeq
  }

  override def getTopicList(group: String): List[String] = {
    topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
  }

  override def getTopicMap: Map[String, scala.Seq[String]] = {
    topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
  }

  override def getActiveTopicMap: Map[String, Seq[String]] = {
    getTopicMap
  }
}

object KafkaOffsetGetter extends Logging {
  val ConsumerOffsetTopic = "__consumer_offsets"
  val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
  val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()

  def startOffsetListener(consumerConnector: ConsumerConnector) = {
    Future {
      try {
        logger.info("Staring Kafka offset topic listener")
        val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
          .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
          .get(ConsumerOffsetTopic).map(_.head) match {
          case Some(s) => s
          case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
        }
        val it = offsetMsgStream.iterator()
        while (true) {
          try {
            val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
            val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
            val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
            info("Processed commit message: " + commitKey + " => " + commitValue)
            offsetMap += (commitKey -> commitValue)
            topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
            info(s"topicAndGroups = $topicAndGroups")
          } catch {
            case e: RuntimeException =>
              // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
              warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
          }
        }
      } catch {
        case e: Throwable =>
          fatal("Offset topic listener aborted dur to unexpected exception", e)
          System.exit(1)
      }
    }
  }

  // massive code stealing from kafka.server.OffsetManager

  import java.nio.ByteBuffer

  import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
  import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}

  private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)

  private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
    new Field("topic", STRING),
    new Field("partition", INT32))
  private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
  private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
  private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64),
    new Field("expire_timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
    new Field("leader_epoch", INT32),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64))
  private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
  private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
  private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
  private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
  private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
  private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
  private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
  private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
  private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
  private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
  // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
  // map of versions to schemas
  private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
    1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
    2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
    3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))

  private def schemaFor(version: Int) = {
    val schemaOpt = OFFSET_SCHEMAS.get(version)
    schemaOpt match {
      case Some(schema) => schema
      case _ => throw new RuntimeException("Unknown offset schema version " + version)
    }
  }

  case class MessageValueStructAndVersion(value: Struct, version: Short)

  case class TopicAndGroup(topic: String, group: String)

  case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {

    def this(group: String, topic: String, partition: Int) =
      this(group, new TopicAndPartition(topic, partition))

    override def toString =
      "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
  }

  /**
   * Decodes the offset messages' key
   *
   * @param buffer input byte-buffer
   * @return an GroupTopicPartition object
   */
  private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
    val version = buffer.getShort()
    val keySchema = schemaFor(version).keySchema
    val key = keySchema.read(buffer).asInstanceOf[Struct]

    val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
    val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
    val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]

    GroupTopicPartition(group, TopicAndPartition(topic, partition))
  }

  /**
   * Decodes the offset messages' payload and retrieves offset and metadata from it
   *
   * @param buffer input byte-buffer
   * @return an offset-metadata object from the message
   */
  private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
    val structAndVersion = readMessageValueStruct(buffer)

    if (structAndVersion.value == null) { // tombstone
      null
    } else {
      if (structAndVersion.version == 0) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
        val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

        OffsetAndMetadata(offset, metadata, timestamp)
      } else if (structAndVersion.version == 1) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else if (structAndVersion.version == 2) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else if (structAndVersion.version == 3) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else {
        throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
      }
    }
  }

  private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
    if (buffer == null) { // tombstone
      MessageValueStructAndVersion(null, -1)
    } else {
      val version = buffer.getShort()
      val valueSchema = schemaFor(version).valueSchema
      val value = valueSchema.read(buffer).asInstanceOf[Struct]

      MessageValueStructAndVersion(value, version)
    }
  }
}

코드(2)

import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._

object KafkaUtilsBuild extends Build {

  def sharedSettings = Defaults.defaultSettings ++ assemblySettings ++ Seq(
    version := "0.3.0-SNAPSHOT",
    scalaVersion := "2.10.3",
    organization := "com.quantifind",
    scalacOptions := Seq("-deprecation", "-unchecked", "-optimize"),
    unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
    retrieveManaged := true,
    transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
    resolvers ++= Seq(
      "sonatype-snapshots" at "http://oss.sonatype.org/content/repositories/snapshots",
      "sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases",
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"),
    libraryDependencies ++= Seq(
      "log4j" % "log4j" % "1.2.17",
      "org.scalatest" %% "scalatest" % "2.2.4" % "test",
      "org.mockito" % "mockito-all" % "1.10.19" % "test",
      "org.apache.kafka" %% "kafka" % "0.8.2.1"))

  val slf4jVersion = "1.6.1"

  //offsetmonitor project

  lazy val offsetmonitor = Project("offsetmonitor", file("."), settings = offsetmonSettings)

  def offsetmonSettings = sharedSettings ++ Seq(
    mergeStrategy in assembly := {
     case "about.html"                                => MergeStrategy.discard
     case x =>
        val oldStrategy = (mergeStrategy in assembly).value
            oldStrategy(x)
    },
    name := "KafkaOffsetMonitor",
    libraryDependencies ++= Seq(
      "net.databinder" %% "unfiltered-filter" % "0.8.4",
      "net.databinder" %% "unfiltered-jetty" % "0.8.4",
      "net.databinder" %% "unfiltered-json4s" % "0.8.4",
      "com.quantifind" %% "sumac" % "0.3.0",
      "com.typesafe.slick" %% "slick" % "2.0.0",
      "org.xerial" % "sqlite-jdbc" % "3.7.2",
      "com.twitter" % "util-core_2.10" % "6.1.0",
      "org.reflections" % "reflections" % "0.9.10"),
    resolvers ++= Seq(
      "java m2" at "http://download.java.net/maven/2",
      "twitter repo" at "http://maven.twttr.com"))
}
profile
Backend Engineer

0개의 댓글