Apache Beam KafkaIO Обработка зависает на readfromkafka - PullRequest
2 голосов
/ 10 октября 2019

Я пытаюсь прочитать тему Кафки, используя Apache Beam, Google Dataflow. Вот код:

public class readkafka3 {
    private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);

    public static void main(String[] args) {
        Pipeline p = Pipeline.create(
        PipelineOptionsFactory.fromArgs(args).withValidation().create());

        PTransform<PBegin, PCollection<KV<String, String>>> kafka =
                KafkaIO.<String, String>read()
                    .withBootstrapServers("masked")
                    .withTopic("simple-avro-kafka-golang")
                    .withKeyDeserializer(StringDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .withMaxNumRecords(5)
                    .withoutMetadata();
        p.apply(kafka)
            .apply(Values.<String>create())
            .apply(TextIO.write().to("gs://sg-dataflow").withSuffix(".csv"));

        p.run().waitUntilFinish();
    }
}

Довольно прямая строковая тема. Однако журналы потока данных Google возвращают

2019-10-09T11:20:32.287Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0. 
2019-10-09T11:20:32.302Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0. 
2019-10-09T11:20:32.316Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0. 
2019-10-09T11:20:32.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0. 
2019-10-09T11:20:33.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0. 
2019-10-09T11:20:33.184Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2. 
2019-10-09T11:20:33.198Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1. 
2019-10-09T11:20:33.212Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0. 
2019-10-09T11:20:33.284Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0. 
2019-10-09T11:20:33.301Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0. 
2019-10-09T11:20:33.315Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0. 
2019-10-09T11:20:33.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0. 
2019-10-09T11:20:34.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0. 
2019-10-09T11:20:34.184Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2. 
2019-10-09T11:20:34.198Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1. 
2019-10-09T11:20:34.213Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0. 
2019-10-09T11:20:34.285Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0. 
2019-10-09T11:20:34.300Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0. 
2019-10-09T11:20:34.314Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0. 
I 2019-10-09T11:20:34.386Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0. 
I 2019-10-09T11:20:35.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0. 
2019-10-09T11:20:35.186Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2. 
2019-10-09T11:20:35.200Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1. 
2019-10-09T11:20:35.215Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0. 
2019-10-09T11:20:35.287Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0. 
2019-10-09T11:20:35.302Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0. 
I 2019-10-09T11:20:35.316Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0. 
I 2019-10-09T11:20:35.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0. 
I 2019-10-09T11:20:36.082Z Proposing dynamic split of work unit mudah-analytics-222509;2019-10-09_04_16_05-6447123281517054755;6616295919164789357 at {"fractionConsumed":0.5} 
2019-10-09T11:20:36.082Z Rejecting split request because custom reader returned null residual source. 

И это продолжается и продолжается. Он просто повторяет сброс смещения для раздела. В конце концов, обработка журналов возвращается из-за ступенчатого чтения из kafka.

Processing stuck in step KafkaIO.Read/KafkaIO.Read/Read(KafkaUnboundedSource)/Read(AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter) for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at 

Пожалуйста, обратите внимание, что я не тот, кто создал тему kafka, которая сделана командой инфра. Я инженер данных, пытающийся читать из темы. Кажется, я зашел в тупик или бесконечный цикл?

logs1

logs2

logs3

Спасибо

...