Как использовать исторический набор данных для обогащения Flink DataStream - PullRequest
0 голосов
/ 28 мая 2019

Я работаю над проектом в реальном времени с Flink, и мне нужно дополнить состояние каждой карты предыдущими транзакциями для функций вычислительных транзакций, как показано ниже:

Для каждой карты у меня есть функция, которая подсчитывает количество транзакций за последние 24 часа. С другой стороны, у меня есть 2 источника данных:

Во-первых, таблица базы данных, в которой хранятся транзакции карт до конца вчерашнего дня.

Во-вторых, поток сегодняшних транзакций.

Итак, первый шаг - извлечь вчерашние транзакции каждой карты из базы данных и сохранить их в состоянии карты. Затем второй шаг - обновить это состояние сегодняшними транзакциями, которые поступают в поток, и рассчитать для них количество транзакций за последние 24 часа. Я попытался прочитать данные базы данных в виде потока и подключить их к сегодняшним транзакциям. Итак, чтобы достичь поставленной цели, я использовал функцию RichFlatMap. Однако, поскольку данные базы данных не были потоковыми по своей природе, выходные данные были неверными. Функция RichFlatMap имеет следующий вид:

transactionsHistory.connect(transactionsStream).flatMap(new         
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,         
ExtractedFeatures>() {
    private ValueState<History> history;
    @Override
    public void open(Configuration config) throws Exception {
        this.history = getRuntimeContext().getState(new 
    ValueStateDescriptor<>("card history", History.class));
    }
    //historical data 
    @Override
    public void flatMap1(History history, 
    Collector<ExtractedFeatures> collector) throws Exception {
        this.history.update(history);
    }
    //new transactions from stream 
    @Override
    public void flatMap2(Tuple2<String, Transaction> 
    transactionTuple, Collector<ExtractedFeatures> collector) throws 
    Exception {
        History history = this.history.value();
        Transaction transaction = transactionTuple.f1;
        ArrayList<History> prevDayHistoryList = 
        history.prevDayTransactions;

        // This function returns transactions which are in 24 hours 
        //window of the current transaction and their count.
        Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple = 
        findHistoricalDate(prevDayHistoryList,
                transaction.transactionLocalDate);
        prevDayHistoryList = prevDayHistoryTuple.f0;
        history.prevDayTransactions = prevDayHistoryList;
        this.history.update(history);
        ExtractedFeatures ef = new ExtractedFeatures();
        ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
        collector.collect(ef);
    }
});

Каков правильный шаблон проектирования для достижения вышеуказанного обогащающего требования в потоковой программе Flink? Я нашел вопрос о переполнении стека, который похож на мой вопрос, но я не смог решить свою проблему, поэтому я решил обратиться за помощью:)

Обогащение DataStream с использованием статического DataSet при потоковой передаче Flink

Любая помощь будет очень признательна.

1 Ответ

1 голос
/ 30 мая 2019

Однако, поскольку данные базы данных не были потоковыми по своей природе, выходные данные были неверными.

Конечно, можно обогатить потоковые данные информацией, поступающей из реляционной базы данных.Однако может быть сложно как-то гарантировать, что данные по обогащению поступают до того, как они понадобятся.В общем случае вам может понадобиться буферизовать поток для обогащения до тех пор, пока данные обогащения не будут загружены / загружены.Один из подходов, который иногда используется, например, состоит в том, чтобы

  1. запустить приложение с отключенным потоком для обогащения
  2. взять точку сохранения после того, как данные обогащения будут полностьюпринятый и сохраненный в состоянии Flink
  3. перезапустите приложение из точки сохранения с включенным потоком для обогащения

В описанном вами случае, однако, это выглядит как более простойподход будет работать.Если вам нужны только 24 часа исторических данных, то почему бы не игнорировать базу данных исторических транзакций?Просто запустите ваше приложение, пока оно не увидит 24 часа потоковой передачи данных, после чего историческая база данных в любом случае станет неактуальной.

Но если вам нужно принять исторические данные, и вам не нравится описанный подход на основе точек сохранениявыше, есть несколько других возможностей:

  • буферизировать необогащенные события в состоянии flink (например, ListState или MapState), пока исторический поток не будет принят
  • написать пользовательскийSourceFunction, которая блокирует первичный поток до тех пор, пока не будут загружены исторические данные

Для более подробного изучения этой темы см. Состояние начальной загрузки в Apache Flink .

Лучшая поддержка этого варианта использования запланирована на будущий выпуск, кстати.

...