Я читаю из темы Кафки data
, которая разделена на основе поля equipmentId
. Всего 15 разделов, по одному на каждый инвентарь.
Данные в теме выглядят так:
{
"timeStamp": "2018-05-03T14:32:04.910Z",
"series": "production-output",
"equipmentId": "5454-07",
"value": 1
}
В том же разделе под equipmentId
может быть одна из двух записей: production-output
или production-input
.
Моя цель - подвести итоги производства за каждую минуту на основе eventTime.
Так выглядит мой код
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(15);
// Add kafka consumer to DataStream
DataStream<String> stream = env.addSource(kafkaConsumer);
DataStream keyedStream = stream
.map(new SeriesMap())
// Filter "production-output" seriesType
.filter(new FilterFunction<Tuple4<Long, String, String, Double>>() {
@Override
public boolean filter(Tuple4<Long, String, String, Double> data) throws Exception {
if (data.f1.equals("production-output")) {
return true;
}
return false;
}
})
// Key on "equipmentId"
.keyBy(2);
DataStreamSink sink = keyedStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Long, String, String, Double>>() {
@Override
public long extractAscendingTimestamp(Tuple4<Long, String, String, Double> data) {
return data.f0;
}
})
// Key on "equipmentId"
.keyBy(2)
.timeWindow(Time.seconds(1))
.sum(3)
.print();
Так что я считаю, что проблема в том, что keyedStream
не создает отдельный поток для каждого ключа.
если бы я должен был выполнить это:
DataStreamSink sink = keyedStream.print();
вывод выглядит так:
15> (1525358087756,production-output,5454-07,1.0)
2> (1525358080269,production-output,5454-05,1.0)
2> (1525358085361,production-output,5454-05,1.0)
2> (1525358088469,production-output,5454-05,1.0)
2> (1525358097630,production-output,5454-05,1.0)
13> (1525358222081,production-output,5454-06,1.0)
13> (1525358223162,production-output,5454-06,1.0)
...
13> (1525358230305,production-output,5454-06,1.0)
13> (1525358234453,production-output,5454-06,1.0)
15> (1525358231998,production-output,5454-01,1.0)
15> (1525358231783,production-output,5454-10,1.0)
15> (1525358232803,production-output,5454-01,1.0)
15> (1525358233811,production-output,5454-01,1.0)
...
15> (1525358238878,production-output,5454-10,1.0)
Таким образом, поток 15 принимает данные для оборудования 5454-10, 01 и 07
в то время как потоки 4,5,6,7,8,10,11,12 и 14 отсутствуют в выводе.
Не на каждой машине будут данные, поэтому я подумал, что может столкнуться с этой проблемой
Однако я думаю, что потоку назначено более 1 ключа , найденных в этом вопросе
любая помощь очень ценится!
примечание: Я могу гарантировать, что порядок отметок времени будет последовательным для каждого раздела.
ОБНОВЛЕНИЕ: Я сделал, как предложил Джошуа ДеВальд, и позвонил assignTimestampsAndWatermarks
на источнике . Я больше не вижу исходную проблему с Timestamp monotony violated
, но теперь сталкиваюсь с FLINK-5479 .
Спасибо!