Точное однократное тестирование с Apache Beam + SparkRunner в локальной среде - PullRequest
1 голос
/ 01 мая 2020

Я работаю под конвейером лучей, который читает из локальной Kafka INPUT_TOPI C и записывает в другую локальную Kafka OUTPUT_TOPI C. Я создал издателя для подачи INPUT_TOPI C (вручную) и потребителя, чтобы проверить, что я получаю по OUTPUT_TOPI C, но меня интересует, является ли это правильной настройкой для проверки семантики с однократной проверкой.

Относительно новичок в Beam и Kafka, поэтому ищите предложения о том, как лучше протестировать этот конвейер и подтвердить, что семантика с точным единожданием работает в локальной среде.

Примечание: Я установил Apache Spark на моей машине и запускаю конвейер с опцией -Pspark-runner.

Пример конвейера луча

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(INPUT_TOPIC)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test.group"))
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false))
.withReadCommitted()
.commitOffsetsInFinalize()
.withoutMetadata())
.apply(Values.<String>create())
.apply(KafkaIO.<Void, String>write()
  .withBootstrapServers("localhost:9092")
  .withTopic(OUTPUT_TOPIC)c
  .withValueSerializer(StringSerializer.class)
  .withEOS(1, "eos-sink-group-id")
  .values()
);

p.run();

Спасибо

...