Как объединить поток и набор данных? - PullRequest
0 голосов
/ 11 марта 2020

Как объединить поток и набор данных? У меня есть поток, и у меня есть данные stati c в файле. Я хочу обогатить данные потока, используя данные в файле.

Пример: в потоке я получаю код аэропорта, а в файле у меня есть имя аэропорта и коды в файле. Теперь я хочу объединить данные потока в файл, чтобы сформировать новый поток с именами аэропортов. Пожалуйста, опишите, как этого добиться.

1 Ответ

0 голосов
/ 11 марта 2020

Существует множество способов приблизиться к обогащению потока с помощью Flink, в зависимости от конкретных требований. https://www.youtube.com/watch?v=cJS18iKLUIY - хороший доклад Константина Кнауфа, который охватывает множество различных подходов и компромиссы между ними.

В простом случае, когда данные обогащения неизменны и достаточно малы, я бы просто используйте RichFlatMap и загрузите весь файл методом open(). Это будет выглядеть примерно так:

public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
      super.open(parameters);
      referenceData = loadReferenceData();
    }

    @Override
    public void flatMap(
        final Event event,
        final Collector<EnrichedEvent> collector) throws Exception {

      SensorReferenceData sensorReferenceData = 
        referenceData.get(sensorMeasurement.getSensorId());
      collector.collect(new EnrichedEvent(event, sensorReferenceData));
    }

}

Вы найдете больше примеров кода для других подходов в https://github.com/knaufk/enrichments-with-flink.

ОБНОВЛЕНИЕ:

Если вы предпочитаете предварительно загружать большие разделенные справочные данные, чтобы объединить их с потоком, есть несколько способов достичь этого, некоторые из которых описаны в видео и репо, которые я опубликовал выше. Для этих конкретных c требований я предлагаю использовать пользовательский разделитель; в этом же репозитории github есть пример здесь . Идея состоит в том, что данные обогащения отбрасываются, и каждое потоковое событие направляется к экземпляру с соответствующими справочными данными.

На мой взгляд, это проще, чем пытаться заставить API таблицы выполнить это конкретное обогащение. как объединение.

...