Неправильный поиск смещения от Кафки - PullRequest
1 голос
/ 17 мая 2019

Я пытаюсь записать некоторые данные Avro в Kafka и использовать их в качестве смещения для чтения записей из базы данных. Поэтому я не хочу читать все записи после каждого этапа обработки данных, я сохраняю смещение в Kafka и затем начинаю с него процесс считывания данных. Я определил метод чтения данных и последние методы поиска смещения для этого. Проблема в том, что когда я добавляю новые данные в первый раз и сохраняю смещение, они не извлекаются должным образом или что-то в этом роде. Когда я добавляю данные во второй раз, данные извлекаются, начиная с предыдущего смещения, а не с текущего. Ожидаемый результат:

Первая партия данных включает первичные ключи 1-20, смещение данных записи для первичного ключа 20, данные извлекаются, начиная с 1. Второй пакет данных включает в себя первичные ключи 21-30, смещение данных записи для первичного ключа 30, данные извлекаются, начиная с 20 (предыдущее смещение). Третья партия данных включает в себя первичные ключи 31-40, смещенные данные записи для первичного ключа 40, данные извлекаются, начиная с 30 (предыдущее смещение). и т. д.

Фактическое поведение выглядит так:

Первая партия данных включает первичные ключи 1-20, смещение данных записи для первичного ключа 20, данные извлекаются, начиная с 1. Вторая партия данных включает в себя первичные ключи 20-30, смещенные данные записи для первичного ключа 30, данные не извлекаются. Третья группа данных включает первичные ключи 31-40, смещенные данные записи для первичного ключа 40, данные извлекаются, начиная с 20 (???).

Так в чем же причина? Есть ли проблема с некоторой конфигурацией Kafka или общей логикой кода?

Так я получаю последнее смещение от Кафки

А вот код для чтения состояния, которое я сохранил для Кафки:

def getLatestOffset() = {
    val kafkaProperties = new Properties()
    kafkaProperties.putAll(kafkaParams)

    val topicAndPartition = new TopicPartition(kafkaTopic, 0)
    val consumer = new KafkaConsumer[String,GenericRecord](kafkaProperties)
    consumer.subscribe(java.util.Arrays.asList(kafkaTopic))

    val consumerRecords = consumer
      .poll(10000)
      .records(kafkaTopic)
      .asScala
      .toList

    val partitionsAssigned = consumer.assignment()
    val endOffset = consumer.endOffsets(partitionsAssigned)
      .get(topicAndPartition)

    endOffset
  }

  def readComplexStateFromKafka(sparkSession: SparkSession, dayColumn: String, endingOffset: Long) = {
    logger.debug(s"Reading from Kafka topic: $kafkaTopic")
    val offsetRanges = Array(
      OffsetRange(kafkaTopic, 0, endingOffset-1, endingOffset)
    )

    val dataRDD = KafkaUtils.createRDD(
      sparkSession.sparkContext,
      sparkAppConfig.kafkaParams.asJava,
      offsetRanges,
      LocationStrategies.PreferConsistent
    )

    val genericRecordsValues = dataRDD
      .map(record =>
        record
          .value()
          .asInstanceOf[GenericRecord]
      )

    val genericRecordsFields = genericRecordsValues
      .map(record =>
          (record.get("table_name").toString,
          record.get("code").toString,
          new Timestamp(record.get(dayColumn).asInstanceOf[Long]).toString)
      )

    genericRecordsFields.first()
  }

Параметры Кафки выглядят так:

  bootstrapServers = "kafka-headless.default.svc.cluster.local:9092"
  schemaRegistryUrl = "http://cp-schema-registry:8081"
  topic = "tableName_offsets"
  keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
  valueDeserializer = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  keySerializer = "org.apache.kafka.common.serialization.StringSerializer"
  valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
  clientId = "table_state"
  groupId = "avro_data"
  autoOffsetReset = "latest"
  enableAutoCommit = true
  auto.commit.interval.ms = 10000
...