Агрегирование тем с помощью Apache Beam Kafkaio (Dataflow) - PullRequest
0 голосов
/ 12 марта 2019

У меня медленно движущиеся данные в компактной теме кафки, а также быстро движущиеся данные в другой теме.

1) быстро движущиеся данные - это поступающие в реальном времени неограниченные события от Кафки.

2) медленно движущиеся данные - это метаданные, которые используются для обогащения быстро движущихся данных.Это сжатая тема, и данные обновляются нечасто (дни / месяцы).

3) Каждая полезная нагрузка быстродвижущихся данных должна иметь полезную нагрузку метаданных с тем же идентификатором customerId, с которым они могут быть агрегированы.

Я хочу объединить данные быстрого / медленного перемещения с идентификатором customerId(распространено в данных по обеим темам).Мне было интересно, как вы будете делать это?Пока что:

PTransform<PBegin, PCollection<KV<byte[], byte[]>>> kafka = KafkaIO.<byte[], byte[]>read()
    .withBootstrapServers(“url:port")
    .withTopics([“fast-moving-data”, “slow-moving-data"])
    .withKeyDeserializer(ByteArrayDeserializer.class)
    .withValueDeserializer(ByteArrayDeserializer.class)
    .updateConsumerProperties((Map) props)
    .withoutMetadata();

Я заметил, что могу использовать .withTopics и указать различные темы, которые хотел бы использовать, но после этого я не смог найти никаких примеров, которые бы помогли в терминахагрегации.Любая помощь будет оценена.

Ответы [ 2 ]

1 голос
/ 13 марта 2019

Следующий шаблон, который также обсуждается в этом SO Q & A , может быть хорошим для изучения в вашем случае использования.Одним из вопросов, который может быть проблемой, является размер вашего сжатого медленного потока.Надеюсь, что это полезно.

Для этого шаблона мы можем использовать исходное преобразование GenerateSequence, чтобы периодически выдавать значение, например, один раз в день.Передайте это значение в глобальное окно через управляемый данными триггер, который активируется для каждого элемента.В DoFn используйте этот процесс как триггер для извлечения данных из вашего ограниченного источника. Создайте свой SideInput для использования в нисходящих преобразованиях.

Важно отметить, что, поскольку в этом шаблоне используется глобальное окно SideInput, запускаемое во время обработкисоответствие элементам, обрабатываемым во время события, будет недетерминированным.Например, если у нас есть основной конвейер, который работает с окнами во время события, версия SideInput View, которую увидят эти окна, будет зависеть от последнего триггера, который сработал во время обработки, а не от времени события.

Также важно отметить, что в целом SideInput должен быть чем-то, что умещается в памяти.

Java (SDK 2.9.0):

В приведенном ниже примере боковой вход обновляется с очень короткими интервалами, это позволяет легко увидеть эффекты.Ожидается, что боковой ввод обновляется медленно, например, каждые несколько часов или один раз в день.

В приведенном ниже примере кода мы используем карту, которую мы создаем в DoFn, которая становится View.asSingleton, это рекомендуемый подход для этого шаблона.

Пример ниже иллюстрирует шаблон, обратите внимание, что View.asSingleton перестраивается при каждом обновлении счетчика.

Для вашего варианта использования вы можете заменить преобразования GenerateSequence преобразованиями PubSubIO.Это имеет смысл?

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}
1 голос
/ 12 марта 2019

Я бы предложил читать эти темы отдельно, создавая два различных входа в конвейер.Вы можете скрестить / присоединиться к ним позже.И способ их пересечения состоит в том, чтобы обеспечить медленный поток в качестве побочного входа в горячую дорожку (преобразования быстро движущейся коллекции PC).

См. Здесь: https://beam.apache.org/documentation/programming-guide/#side-inputs

...