Новые учебники по state и связанных потоков должны помочь вам с вашими вопросами. Но вкратце:
Вы не можете контролировать порядок, в котором будут вызываться обратные вызовы processElement1
и processElement2
. Эти два входных потока конкурируют друг с другом, и среда выполнения Flink будет делать то, что хочет, в отношении потребления событий из одного потока или другого. В случаях, когда время и / или порядок имеют значение, вы можете счесть необходимым буферизовать события в управляемом состоянии Flink до тех пор, пока ваше приложение не будет готово их обработать.
Значение ValueState является разновидностью keyed state , что означает, что при каждом обращении к состоянию или его обновлении запись в бэкэнде состояния для ключа в контексте читается или записывается. «Ключ в контексте» - это ключ для обрабатываемого элемента потока (в случае обратного вызова processElement
) или для ключа, который создал таймер (в случае обратного вызова onTimer
).
Также имейте в виду, что в этом упражнении для каждого ключа предусмотрено не более одного TaxiRide и одного TaxiFare.
Эталонные решения для этого упражнения иллюстрируют один способ думать о том, как управлять состоянием, которое в противном случае могло бы просочиться, но это не та ситуация, когда есть один, очевидно правильный ответ. Цель этого упражнения состоит в том, чтобы стимулировать некоторые размышления о том, как работать с состояниями и таймерами, и вынести некоторые проблемы, связанные с этим, на поверхность.
Каковы могут быть наши цели для хорошего решения? Он должен
- давать правильные результаты
- не иметь состояния утечки
- быть простым для понимания
- иметь хорошую производительность
Теперь давайте рассмотрим предлагаемое решение с учетом этих целей. Мы находим этот код в processElement1
(и, кстати, processElement2
такой же, только с ролями, измененными между поездкой и стоимостью проезда):
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
context.timerService().deleteEventTimeTimer(fare.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
// as soon as the watermark arrives, we can stop waiting for the corresponding fare
context.timerService().registerEventTimeTimer(ride.getEventTime());
}
}
Это означает, что
- всякий раз, когда приходит событие, которое не завершает пару, мы сохраняем его в состоянии и создаем таймер
- всякий раз, когда приходит событие, которое завершает пару, мы очищаем состояние и удаляем таймер для соответствующее событие (которое было сохранено ранее)
Таким образом, ясно, что ничто не может просочиться, если оба события прибывают. Но что, если один из них отсутствует?
В этом случае таймер сработает в какой-то момент и запустит этот код, который четко очистит любое состояние, которое может существовать:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
if (fareState.value() != null) {
ctx.output(unmatchedFares, fareState.value());
fareState.clear();
}
if (rideState.value() != null) {
ctx.output(unmatchedRides, rideState.value());
rideState.clear();
}
}
Ok а как мы решили как долго ждать? Достаточно ли подождать до ride.getEventTime()
?
Эффект установки таймера времени события для ride.getEventTime()
состоит в том, чтобы дождаться устранения любых несогласованностей в потоках поездок и тарифов. Все ранние события, связанные с поездками и проездом, наступят к тому времени, когда уровень водяного знака достигнет ride.getEventTime()
, , если предположить, что водяные знаки идеально подходят .
В этих упражнениях водяные знаки на самом деле идеальны - не может быть никаких поздних событий. Но в реальных условиях вы должны ожидать некоторых поздних событий, и мы должны ожидать, что наша реализация ведет себя правильно в этой ситуации. Это эталонное решение будет делать следующее:
- одно из событий в соответствующей паре прибудет первым и создаст таймер для организации его возможного удаления
- , которое сработает таймер и событие будет очищено
- соответствующее событие прибывает поздно и создает другой таймер, в этом случае на время, которое уже прошло
- следующий поступающий водяной знак запускает этот таймер, и состояние очищается
Другими словами, когда событие задерживается, ни одно состояние не будет утечкой, но полученное соединение не будет создано. Таким образом, в тех случаях, когда вы хотите получать результаты, несмотря на опоздавшие данные, вы должны создать таймеры, которые будут учитывать некоторое запоздание, сохраняя необходимое состояние в течение некоторого дополнительного периода времени, например,
context.timerService().registerEventTimeTimer(ride.getEventTime() + ALLOWED_LATENESS);
Не стоит пытаться приспособить произвольно поздние события, потому что для этого необходимо сохранять некоторое состояние для каждого позднего события на неопределенный срок.
Как насчет использования таймеров времени обработки вместо этого?
Конечно, это будет работать, но это может быть более неудобно для тестирования.
Почему бы вместо этого не использовать State Time-To-Live?
Прекрасная идея. В целом вы можете подумать о терминах, например, State TTL для соответствия GDPR и использовать таймеры для реализации бизнес-логики c.