Достаточно просто использовать что-то вроде KeyedProcessFunction
для постоянного обновления некоторого состояния Flink, которое агрегирует statement_amount
для каждого statement_id
, когда поступают новые транзакции. Но вопрос, насколько я понимаю, состоит в том, как узнать, когда завершится это агрегирование, или, другими словами, когда Flink обработал все транзакции для данного statement_id
.
, с которыми приложения обработки потоков всегда сталкиваются Эта проблема. В отличие от пакетной обработки, где можно просто обработать все данные и затем получить результат, при потоковой обработке мы обрабатываем одну запись за раз, не зная, что может произойти в будущем, или каким образом большая задержка.
Это приводит нас к компромиссу между задержкой и полнотой . В общем, всегда можно немного подождать, чтобы увидеть, какие дополнительные данные поступают, тем самым увеличивая свои шансы на получение результата на основе (более) полной информации. Водяные знаки являются техническим проявлением этого компромисса. Любое потоковое приложение, которое использует время события, должно создавать водяные знаки, каждый из которых помечает точку в потоке временной меткой и объявляет, что в этот момент поток , вероятно, завершен до этой временной метки.
Для некоторых приложений быстрое получение результата, который, вероятно, является правильным, - это хорошо, и на самом деле может быть лучше, чем ждать дольше, чтобы получить результат, который с большей вероятностью будет правильным. Но в других приложениях необходимо быть абсолютно точным (что бы это ни значило, точно).
Именно то, что вы должны сделать, это не технический вопрос, а вопрос о бизнес-процессе. В конечном итоге это зависит именно от того, что выверенное заявление означает для вашего бизнеса. Возможно, вам следует попытаться воспроизвести семантику того процесса, который в данный момент выполняется.
Сказав это, Flink предоставляет набор инструментов, которые вы можете комбинировать для решения этого варианта использования различными способами, в зависимости от деталей. о том, как вы хотите, чтобы это работало. Вот как части могут совмещаться:
Каждое утверждение имеет end_time
. Когда водяной знак для потока транзакций достигает значения end_time
, это первый момент, когда можно считать, что агрегация транзакций для этого оператора завершена.
Этот водяной знак (как правило) будет сделан на основе указания ограничения на сумму, на которую поток транзакций может быть не в порядке. Но вы должны ожидать, что независимо от того, насколько вы пессимистичны c, несколько аномальных транзакций нарушат это предположение и будут поздними относительно водяных знаков.
Чтобы учесть это, вы можете либо увеличьте задержку с водяными знаками, чтобы попытаться покрыть все мыслимые задержки (которые, как можно утверждать, вообще невозможно), либо решите, что в какой-то момент вы просто должны go опередить и создать утверждение, которое утверждает, что согласовано, но может потребовать обновления или дополнения в будущем. Является ли эта проблема произвольной задержки реальной проблемой (как это может быть в банковском деле, где некоторые международные транзакции могут испытывать очень длительные задержки), или просто теоретической, зависит от вашего фактического варианта использования.
Возможность для учета поздних транзакций потребуется, чтобы вы (1) сохранили данные выписки в управляемом состоянии Флинка, чтобы добавить позднюю транзакцию (и), которую затем можно использовать для обновления выписки, или (2) обработать поздние события в особым образом, читая ранее полученный результат из БД и затем обновляя эту запись в БД (что должно было бы быть выполнено транзакционно). Подход № 2 может быть реализован в отдельном задании, которое использует поток поздних транзакций, созданных первым заданием.
Возможно, вы сможете определить выход из этой проблемы, включив метку времени в оператор, который указывает что заявление включает в себя именно те транзакции, которые были обработаны до этого момента.