Я работаю под конвейером лучей, который читает из локальной Kafka INPUT_TOPI C и записывает в другую локальную Kafka OUTPUT_TOPI C. Я создал издателя для подачи INPUT_TOPI C (вручную) и потребителя, чтобы проверить, что я получаю по OUTPUT_TOPI C, но меня интересует, является ли это правильной настройкой для проверки семантики с однократной проверкой.
Относительно новичок в Beam и Kafka, поэтому ищите предложения о том, как лучше протестировать этот конвейер и подтвердить, что семантика с точным единожданием работает в локальной среде.
Примечание: Я установил Apache Spark на моей машине и запускаю конвейер с опцией -Pspark-runner
.
Пример конвейера луча
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(INPUT_TOPIC)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test.group"))
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false))
.withReadCommitted()
.commitOffsetsInFinalize()
.withoutMetadata())
.apply(Values.<String>create())
.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic(OUTPUT_TOPIC)c
.withValueSerializer(StringSerializer.class)
.withEOS(1, "eos-sink-group-id")
.values()
);
p.run();
Спасибо