Я пытаюсь прочитать тему Кафки, используя Apache Beam, Google Dataflow. Вот код:
public class readkafka3 {
private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PTransform<PBegin, PCollection<KV<String, String>>> kafka =
KafkaIO.<String, String>read()
.withBootstrapServers("masked")
.withTopic("simple-avro-kafka-golang")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(5)
.withoutMetadata();
p.apply(kafka)
.apply(Values.<String>create())
.apply(TextIO.write().to("gs://sg-dataflow").withSuffix(".csv"));
p.run().waitUntilFinish();
}
}
Довольно прямая строковая тема. Однако журналы потока данных Google возвращают
2019-10-09T11:20:32.287Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0.
2019-10-09T11:20:32.302Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0.
2019-10-09T11:20:32.316Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0.
2019-10-09T11:20:32.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0.
2019-10-09T11:20:33.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0.
2019-10-09T11:20:33.184Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2.
2019-10-09T11:20:33.198Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1.
2019-10-09T11:20:33.212Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0.
2019-10-09T11:20:33.284Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0.
2019-10-09T11:20:33.301Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0.
2019-10-09T11:20:33.315Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0.
2019-10-09T11:20:33.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0.
2019-10-09T11:20:34.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0.
2019-10-09T11:20:34.184Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2.
2019-10-09T11:20:34.198Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1.
2019-10-09T11:20:34.213Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0.
2019-10-09T11:20:34.285Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0.
2019-10-09T11:20:34.300Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0.
2019-10-09T11:20:34.314Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0.
I 2019-10-09T11:20:34.386Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0.
I 2019-10-09T11:20:35.111Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-0 to offset 0.
2019-10-09T11:20:35.186Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-1 to offset 2.
2019-10-09T11:20:35.200Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-2 to offset 1.
2019-10-09T11:20:35.215Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-3 to offset 0.
2019-10-09T11:20:35.287Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-4 to offset 0.
2019-10-09T11:20:35.302Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-5 to offset 0.
I 2019-10-09T11:20:35.316Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-6 to offset 0.
I 2019-10-09T11:20:35.388Z [Consumer clientId=consumer-3, groupId=Reader-0_offset_consumer_1300992802_my_beam_app_1] Resetting offset for partition simple-avro-kafka-golang-7 to offset 0.
I 2019-10-09T11:20:36.082Z Proposing dynamic split of work unit mudah-analytics-222509;2019-10-09_04_16_05-6447123281517054755;6616295919164789357 at {"fractionConsumed":0.5}
2019-10-09T11:20:36.082Z Rejecting split request because custom reader returned null residual source.
И это продолжается и продолжается. Он просто повторяет сброс смещения для раздела. В конце концов, обработка журналов возвращается из-за ступенчатого чтения из kafka.
Processing stuck in step KafkaIO.Read/KafkaIO.Read/Read(KafkaUnboundedSource)/Read(AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter) for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at
Пожалуйста, обратите внимание, что я не тот, кто создал тему kafka, которая сделана командой инфра. Я инженер данных, пытающийся читать из темы. Кажется, я зашел в тупик или бесконечный цикл?
logs1
logs2
logs3
Спасибо