Я работаю над проектом в реальном времени с Flink, и мне нужно дополнить состояние каждой карты предыдущими транзакциями для функций вычислительных транзакций, как показано ниже:
Для каждой карты у меня есть функция, которая подсчитывает количество транзакций за последние 24 часа. С другой стороны, у меня есть 2 источника данных:
Во-первых, таблица базы данных, в которой хранятся транзакции карт до конца вчерашнего дня.
Во-вторых, поток сегодняшних транзакций.
Итак, первый шаг - извлечь вчерашние транзакции каждой карты из базы данных и сохранить их в состоянии карты. Затем второй шаг - обновить это состояние сегодняшними транзакциями, которые поступают в поток, и рассчитать для них количество транзакций за последние 24 часа.
Я попытался прочитать данные базы данных в виде потока и подключить их к сегодняшним транзакциям. Итак, чтобы достичь поставленной цели, я использовал функцию RichFlatMap. Однако, поскольку данные базы данных не были потоковыми по своей природе, выходные данные были неверными. Функция RichFlatMap имеет следующий вид:
transactionsHistory.connect(transactionsStream).flatMap(new
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,
ExtractedFeatures>() {
private ValueState<History> history;
@Override
public void open(Configuration config) throws Exception {
this.history = getRuntimeContext().getState(new
ValueStateDescriptor<>("card history", History.class));
}
//historical data
@Override
public void flatMap1(History history,
Collector<ExtractedFeatures> collector) throws Exception {
this.history.update(history);
}
//new transactions from stream
@Override
public void flatMap2(Tuple2<String, Transaction>
transactionTuple, Collector<ExtractedFeatures> collector) throws
Exception {
History history = this.history.value();
Transaction transaction = transactionTuple.f1;
ArrayList<History> prevDayHistoryList =
history.prevDayTransactions;
// This function returns transactions which are in 24 hours
//window of the current transaction and their count.
Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple =
findHistoricalDate(prevDayHistoryList,
transaction.transactionLocalDate);
prevDayHistoryList = prevDayHistoryTuple.f0;
history.prevDayTransactions = prevDayHistoryList;
this.history.update(history);
ExtractedFeatures ef = new ExtractedFeatures();
ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
collector.collect(ef);
}
});
Каков правильный шаблон проектирования для достижения вышеуказанного обогащающего требования в потоковой программе Flink?
Я нашел вопрос о переполнении стека, который похож на мой вопрос, но я не смог решить свою проблему, поэтому я решил обратиться за помощью:)
Обогащение DataStream с использованием статического DataSet при потоковой передаче Flink
Любая помощь будет очень признательна.