FlinkKafkaConsumer010 не работает, если установлено с помощью setStartFromTimestamp - PullRequest
0 голосов
/ 15 апреля 2020

Я использую потоковую передачу flink и flink-connector-kafka для обработки данных из kafka. когда я настраиваю FlinkKafkaConsumer010 с setStartFromTimestamp (1586852770000L), в это время все данные в kafka topi c A предшествуют 1586852770000L, тогда я отправляю некоторое сообщение в section-0 и section-4 Topi c A (Topi c A имеет 6 разделов, текущее системное время уже после 1586852770000L). но моя программа flink не потребляет никаких данных из Topi c A. Так это проблема?

Если я остановлю свою программу flink и перезапущу ее, она может потреблять данные из section-0 и partition- 4 из Topi c A, но все равно не будет потреблять данные из других 4 разделов, если я отправлю данные в другие 4 раздела, если только я не перезапущу свою программу flink.

журнал kafka выглядит следующим образом :

2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1

2020-04-15 11:48:46,481 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]

из журнала, кроме раздела 0 и раздела 4, смещение другого 4 раздела равно -1. почему смещение возврата равно -1 вместо самого последнего смещения?

в коде клиента Kafka (сборщик. java, строка: 674-680)

// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
   OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
   timestampOffsetMap.put(topicPartition, offsetData);
}

значение ListOffsetResponse. UNKNOWN_OFFSET равен -1. Таким образом, остальные 4 раздела фильтруются, и потребитель kafka не будет использовать данные из других 4 разделов.

Моя версия Flink - 1.9.2, а соединитель flink kafka -

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.9.2</version>
</dependency>

do c разъема flink kafka выглядит следующим образом:

setStartFromTimestamp (long) : запуск с указанной отметки времени. Для каждого раздела в качестве начальной позиции будет использоваться запись, отметка времени которой больше или равна указанной отметке времени. Если последняя запись раздела раньше, чем отметка времени, раздел будет просто считан из последней записи.

код тестовой программы:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test

class TestFlinkKafka {

  @Test
  def testFlinkKafkaDemo: Unit ={
    //1. set up the streaming execution environment.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
    // To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
    env.enableCheckpointing(60000)
    //2. kafka source
    val topic = "message"
    val schema = new SimpleStringSchema()
    //server1:9092,server2:9092,server3:9092
    val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
    val  consumer = new FlinkKafkaConsumer010(topic, schema, props)
    //consume data from special timestamp's offset
    //2020/4/14 20:0:0
    //consumer.setStartFromTimestamp(1586865600000L)
    //2020/4/15 20:0:0
    consumer.setStartFromTimestamp(1586952000000L)
    consumer.setCommitOffsetsOnCheckpoints(true)

    //3. transform
    val stream = env.addSource(consumer)
      .map(x => x)

    //4. sink
    stream.print()

    //5. execute
    env.execute("testFlinkKafkaConsumer")

  }

  def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokerList)
    props.setProperty("group.id", groupId)
    props.setProperty("auto.offset.reset", offsetReset)
    props.setProperty("flink.partition-discovery.interval-millis", "30000")
    props
  }

}

установить уровень журнала для kafka:

log4j.logger.org.apache.kafka=TRACE

создать kafka topi c:

kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1

отправить сообщение kafka topi c

kafka-console-producer --broker-list localhost:9092 --topic message

{"name":"tom"}
{"name":"michael"}

1 Ответ

0 голосов
/ 16 апреля 2020

Эта проблема была решена путем обновления разъема Flink / Kafka до более нового универсального разъема - FlinkKafkaConsumer - доступен с flink-connector-kafka_2.11. Эта версия коннектора рекомендуется для всех версий Kafka начиная с 1.0.0. С Kafka 0.10.x или 0.11.x лучше использовать указатели версии c flink-connector-kafka-0.10_2.11 или flink-connector-kafka-0.11_2.11. (И во всех случаях замените 2.12 на 2.11, если вы используете Scala 2.12.)

См. Документацию Flink для получения дополнительной информации о разъеме Flink Kafka.

...