Головоломки ExpiringStateSolution от Ververica Flink Учебные материалы - PullRequest
0 голосов
/ 23 апреля 2020

К сожалению, изначально обучение ververica было изменено и перенаправлено на другую страницу, что привело к тому, что я не смог снова просмотреть вступление к этому примеру, я нашел некоторые другие примеры, но для этого конкретного c, я не смог найти это то, что вокруг меня есть что-то, с чем я недавно боролся,

для фрагмента кода основной части, это следующее: Первый метод для работы с потоком езды

        @Override
        public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            TimerService service = context.timerService();
            System.out.println("ride time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
            System.out.println("ride state ===> " + fare);
            if (fare != null) {
                System.out.println("fare is not null ===>" + fare.rideId);
                fareState.clear();
                context.timerService().deleteEventTimeTimer(fare.getEventTime());
                out.collect(new Tuple2(ride, fare));
            } else {
                System.out.println("update ride state ===> " + ride.rideId + "===>" + context.timestamp());
                rideState.update(ride);
                System.out.println(rideState.value());
                // as soon as the watermark arrives, we can stop waiting for the corresponding fare
                context.timerService().registerEventTimeTimer(ride.getEventTime());
            }
        }

второй метод для работы с потоком тарифов

        @Override
        public void processElement2(TaxiFare fare, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TimerService service = context.timerService();
            System.out.println("fare time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
            TaxiRide ride = rideState.value();
            System.out.println("fare state ===> " + ride);
            if (ride != null) {
                System.out.println("ride is not null ===> " + ride.rideId);
                rideState.clear();
                context.timerService().deleteEventTimeTimer(ride.getEventTime());
                out.collect(new Tuple2(ride, fare));
            } else {
                System.out.println("update fare state ===> " + fare.rideId + "===>" + context.timestamp());
                fareState.update(fare);
                System.out.println(fareState.value() + "===>" + fareState.value().getEventTime());
                // as soon as the watermark arrives, we can stop waiting for the corresponding ride
                context.timerService().registerEventTimeTimer(fare.getEventTime());
            }
        }

processElement1, очевидно, предназначен для потока TaxiRide, а 2 - для TaxiFare. Во-первых, он некоторое время будет запускать processElement2 перед выполнением processElement1, я не нашел причина до сих пор, вот часть печати

fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
fare state ===> null
update fare state ===> 26===>1356998400000
update fare state ===> 58===>1356998400000
58,2013000058,2013000058,2013-01-01 00:00:00,CRD,2.0,0.0,27.0===>1356998400000
26,2013000026,2013000026,2013-01-01 00:00:00,CRD,2.0,0.0,12.5===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 47===>1356998400000
9,2013000009,2013000009,2013-01-01 00:00:00,CRD,1.0,0.0,6.0===>1356998400000
47,2013000047,2013000047,2013-01-01 00:00:00,CRD,0.9,0.0,5.9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 54===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
54,2013000054,2013000054,2013-01-01 00:00:00,CSH,0.0,0.0,31.0===>1356998400000

Вторая причина в том, что ValueState - это примерно одно значение, а не список, содержащий много значений, для каждого вызова processElemnt2, если null, он будет go в противном случае, после вызова TarState.update (), он изменит значение ValueState, на мой взгляд, означает, что предыдущее значение ValueState совпадает, верно? ----- Самая большая загадка, спасибо за ответ, я ценю вашу помощь!

1 Ответ

0 голосов
/ 23 апреля 2020

Новые учебники по state и связанных потоков должны помочь вам с вашими вопросами. Но вкратце:

  1. Вы не можете контролировать порядок, в котором будут вызываться обратные вызовы processElement1 и processElement2. Эти два входных потока конкурируют друг с другом, и среда выполнения Flink будет делать то, что хочет, в отношении потребления событий из одного потока или другого. В случаях, когда время и / или порядок имеют значение, вы можете счесть необходимым буферизовать события в управляемом состоянии Flink до тех пор, пока ваше приложение не будет готово их обработать.

  2. Значение ValueState является разновидностью keyed state , что означает, что при каждом обращении к состоянию или его обновлении запись в бэкэнде состояния для ключа в контексте читается или записывается. «Ключ в контексте» - это ключ для обрабатываемого элемента потока (в случае обратного вызова processElement) или для ключа, который создал таймер (в случае обратного вызова onTimer).

Также имейте в виду, что в этом упражнении для каждого ключа предусмотрено не более одного TaxiRide и одного TaxiFare.

Эталонные решения для этого упражнения иллюстрируют один способ думать о том, как управлять состоянием, которое в противном случае могло бы просочиться, но это не та ситуация, когда есть один, очевидно правильный ответ. Цель этого упражнения состоит в том, чтобы стимулировать некоторые размышления о том, как работать с состояниями и таймерами, и вынести некоторые проблемы, связанные с этим, на поверхность.

Каковы могут быть наши цели для хорошего решения? Он должен

  • давать правильные результаты
  • не иметь состояния утечки
  • быть простым для понимания
  • иметь хорошую производительность

Теперь давайте рассмотрим предлагаемое решение с учетом этих целей. Мы находим этот код в processElement1 (и, кстати, processElement2 такой же, только с ролями, измененными между поездкой и стоимостью проезда):

public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
    TaxiFare fare = fareState.value();
    if (fare != null) {
        fareState.clear();
        context.timerService().deleteEventTimeTimer(fare.getEventTime());
        out.collect(new Tuple2(ride, fare));
    } else {
        rideState.update(ride);
        // as soon as the watermark arrives, we can stop waiting for the corresponding fare
        context.timerService().registerEventTimeTimer(ride.getEventTime());
    }
}

Это означает, что

  • всякий раз, когда приходит событие, которое не завершает пару, мы сохраняем его в состоянии и создаем таймер
  • всякий раз, когда приходит событие, которое завершает пару, мы очищаем состояние и удаляем таймер для соответствующее событие (которое было сохранено ранее)

Таким образом, ясно, что ничто не может просочиться, если оба события прибывают. Но что, если один из них отсутствует?

В этом случае таймер сработает в какой-то момент и запустит этот код, который четко очистит любое состояние, которое может существовать:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
    if (fareState.value() != null) {
        ctx.output(unmatchedFares, fareState.value());
        fareState.clear();
    }
    if (rideState.value() != null) {
        ctx.output(unmatchedRides, rideState.value());
        rideState.clear();
    }
}

Ok а как мы решили как долго ждать? Достаточно ли подождать до ride.getEventTime()?

Эффект установки таймера времени события для ride.getEventTime() состоит в том, чтобы дождаться устранения любых несогласованностей в потоках поездок и тарифов. Все ранние события, связанные с поездками и проездом, наступят к тому времени, когда уровень водяного знака достигнет ride.getEventTime(), , если предположить, что водяные знаки идеально подходят .

В этих упражнениях водяные знаки на самом деле идеальны - не может быть никаких поздних событий. Но в реальных условиях вы должны ожидать некоторых поздних событий, и мы должны ожидать, что наша реализация ведет себя правильно в этой ситуации. Это эталонное решение будет делать следующее:

  • одно из событий в соответствующей паре прибудет первым и создаст таймер для организации его возможного удаления
  • , которое сработает таймер и событие будет очищено
  • соответствующее событие прибывает поздно и создает другой таймер, в этом случае на время, которое уже прошло
  • следующий поступающий водяной знак запускает этот таймер, и состояние очищается

Другими словами, когда событие задерживается, ни одно состояние не будет утечкой, но полученное соединение не будет создано. Таким образом, в тех случаях, когда вы хотите получать результаты, несмотря на опоздавшие данные, вы должны создать таймеры, которые будут учитывать некоторое запоздание, сохраняя необходимое состояние в течение некоторого дополнительного периода времени, например,

context.timerService().registerEventTimeTimer(ride.getEventTime() + ALLOWED_LATENESS);

Не стоит пытаться приспособить произвольно поздние события, потому что для этого необходимо сохранять некоторое состояние для каждого позднего события на неопределенный срок.

Как насчет использования таймеров времени обработки вместо этого?

Конечно, это будет работать, но это может быть более неудобно для тестирования.

Почему бы вместо этого не использовать State Time-To-Live?

Прекрасная идея. В целом вы можете подумать о терминах, например, State TTL для соответствия GDPR и использовать таймеры для реализации бизнес-логики c.

...