Окно EventTime из потока Kafka, вызывающее ошибку «Монотонность временной метки нарушена» - PullRequest
0 голосов
/ 03 мая 2018

Я читаю из темы Кафки data, которая разделена на основе поля equipmentId. Всего 15 разделов, по одному на каждый инвентарь.

Данные в теме выглядят так: { "timeStamp": "2018-05-03T14:32:04.910Z", "series": "production-output", "equipmentId": "5454-07", "value": 1 } В том же разделе под equipmentId может быть одна из двух записей: production-output или production-input. Моя цель - подвести итоги производства за каждую минуту на основе eventTime.

Так выглядит мой код

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(15);

        // Add kafka consumer to DataStream
    DataStream<String> stream = env.addSource(kafkaConsumer);

    DataStream keyedStream = stream
            .map(new SeriesMap())
            // Filter "production-output" seriesType
            .filter(new FilterFunction<Tuple4<Long, String, String, Double>>() {
                @Override
                public boolean filter(Tuple4<Long, String, String, Double> data) throws Exception {
                    if (data.f1.equals("production-output")) {
                        return true;
                    }
                    return false;
                }
            })
            // Key on "equipmentId"
            .keyBy(2);

    DataStreamSink sink = keyedStream
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Long, String, String, Double>>() {
                @Override
                public long extractAscendingTimestamp(Tuple4<Long, String, String, Double> data) {
                    return data.f0;
                }
            })
            // Key on "equipmentId"
            .keyBy(2)
            .timeWindow(Time.seconds(1))
            .sum(3)
            .print();

Так что я считаю, что проблема в том, что keyedStream не создает отдельный поток для каждого ключа.

если бы я должен был выполнить это:

        DataStreamSink sink = keyedStream.print();

вывод выглядит так:

15> (1525358087756,production-output,5454-07,1.0)
2> (1525358080269,production-output,5454-05,1.0)
2> (1525358085361,production-output,5454-05,1.0)
2> (1525358088469,production-output,5454-05,1.0)
2> (1525358097630,production-output,5454-05,1.0)
13> (1525358222081,production-output,5454-06,1.0)
13> (1525358223162,production-output,5454-06,1.0)
...
13> (1525358230305,production-output,5454-06,1.0)
13> (1525358234453,production-output,5454-06,1.0)
15> (1525358231998,production-output,5454-01,1.0)
15> (1525358231783,production-output,5454-10,1.0)
15> (1525358232803,production-output,5454-01,1.0)
15> (1525358233811,production-output,5454-01,1.0)
...
15> (1525358238878,production-output,5454-10,1.0)

Таким образом, поток 15 принимает данные для оборудования 5454-10, 01 и 07 в то время как потоки 4,5,6,7,8,10,11,12 и 14 отсутствуют в выводе.

Не на каждой машине будут данные, поэтому я подумал, что может столкнуться с этой проблемой

Однако я думаю, что потоку назначено более 1 ключа , найденных в этом вопросе

любая помощь очень ценится!

примечание: Я могу гарантировать, что порядок отметок времени будет последовательным для каждого раздела.

ОБНОВЛЕНИЕ: Я сделал, как предложил Джошуа ДеВальд, и позвонил assignTimestampsAndWatermarks на источнике . Я больше не вижу исходную проблему с Timestamp monotony violated, но теперь сталкиваюсь с FLINK-5479 .

Спасибо!

1 Ответ

0 голосов
/ 03 мая 2018

Я полагаю, что если вы не можете гарантировать прогресс в метках времени по всем разделам, поскольку вы извлекаете метки времени и водяные знаки за пределами вашего источника, вы получите эту ошибку.

Что вы потенциально можете сделать, это использовать свой класс SeriesMap в качестве схемы десериализации Kafka, а затем выполнить assignTimestampsAndWatermarks против вашего источника Kafka. В этом случае у Kafka не возникнет проблем с перемещением ваших временных меток по отдельности в каждом разделе, а глобальный водяной знак, который он испускает, будет минимальным водяным знаком, встречающимся во всех разделах.

Другими словами, при этом ваше глобальное время события будет двигаться вперед со скоростью вашего самого медленного раздела. Важным предупреждением здесь является то, что каждый из ваших разделов должен излучать хотя бы некоторые данные, иначе ваш прогресс во времени остановится.

Обратите внимание, что время в Flink является глобальным, а не для каждого ключа.

...