У меня есть SingleOutputStreamOperator
, на котором я выполняю некоторую обработку и для которой мне нужно сделать несколько keyBy()
.
Вот пример кода:
public SingleOutputStreamOperator<Map<String, Object>> process(DataStreamSource<Map<String, Object>> stream) {
BroadcastStream<Map<String,Object>> broadcastedStream = ...;
return stream
.assignTimestampsAndWatermarks(...)
.keyBy(new MyKeySelector("fieldAAA"))
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.aggregate(new MyAggregationFunction1())
.keyBy(new MapKeySelector("fieldAAA"))
.connect(broadcastedStream)
.process(new MyEvaluator()) // <-- 'fieldBBB' is built here
.keyBy(new MyKeySelector("fieldBBB"))
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.aggregate(new MyAggregationFunction2());
}
Но яполучить следующую ошибку:
2019-10-10 10:40:07.664 INFO org.apache.flink.runtime.taskmanager.Task - Co-Process-Broadcast-Keyed (6/12) (50ed3ab36b8f4078d865b7026cab08e5) switched from RUNNING to FAILED.
java.lang.RuntimeException: null
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at my.package.processElement(MyEvaluator.java:54)
at my.package.processElement(MyEvaluator.java:22)
at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:58)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 13 common frames omitted
Из трассировки стека это выглядит как ошибка внутри processElement
функции MyEvaluator
(KeyedBroadcastProcessFunction):
@Override
public void processElement(Map<String, Object> value, ReadOnlyContext ctx, Collector<Map<String, Object>> out) throws Exception {
List<Map<String, Object>> newFieldsList = ... ; // Retrieve a list of new fields based on 'value' and elements received using the broadcastedStream
for(Map<String, Object> newFields : newFieldsList){
value.putAll(newFields); // Add all newFields to current value
out.collect(value); // <-- NPE occur here
}
}
Но в моемобработка, если я удаляю часть с keyBy("fieldBBB")
, код запускается. И что еще интереснее, если я просто заменю keyBy("fieldBBB")
на keyBy("fieldAAA")
, код запустится.
Как вы объясните это поведение и как я могу делать то, что хочу?