Flink несколько keyBy () в потоке - PullRequest
0 голосов
/ 10 октября 2019

У меня есть SingleOutputStreamOperator, на котором я выполняю некоторую обработку и для которой мне нужно сделать несколько keyBy().

Вот пример кода:

public SingleOutputStreamOperator<Map<String, Object>> process(DataStreamSource<Map<String, Object>> stream) {

    BroadcastStream<Map<String,Object>> broadcastedStream = ...;

    return stream
        .assignTimestampsAndWatermarks(...)

        .keyBy(new MyKeySelector("fieldAAA"))                                           
        .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))    
        .aggregate(new MyAggregationFunction1()) 

        .keyBy(new MapKeySelector("fieldAAA"))                                           
        .connect(broadcastedStream)
        .process(new MyEvaluator())             // <-- 'fieldBBB' is built here

        .keyBy(new MyKeySelector("fieldBBB"))                                            
        .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))     
        .aggregate(new MyAggregationFunction2()); 
}

Но яполучить следующую ошибку:

2019-10-10 10:40:07.664 INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process-Broadcast-Keyed (6/12) (50ed3ab36b8f4078d865b7026cab08e5) switched from RUNNING to FAILED.
java.lang.RuntimeException: null
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at my.package.processElement(MyEvaluator.java:54)
    at my.package.processElement(MyEvaluator.java:22)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
    at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
    at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
    at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:58)
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
    ... 13 common frames omitted

Из трассировки стека это выглядит как ошибка внутри processElement функции MyEvaluator (KeyedBroadcastProcessFunction):

@Override
public void processElement(Map<String, Object> value, ReadOnlyContext ctx, Collector<Map<String, Object>> out) throws Exception {
    List<Map<String, Object>> newFieldsList = ... ; // Retrieve a list of new fields based on 'value' and elements received using the broadcastedStream

    for(Map<String, Object> newFields : newFieldsList){
        value.putAll(newFields);     // Add all newFields to current value
        out.collect(value);          // <-- NPE occur here
    }
}

Но в моемобработка, если я удаляю часть с keyBy("fieldBBB"), код запускается. И что еще интереснее, если я просто заменю keyBy("fieldBBB") на keyBy("fieldAAA"), код запустится.

Как вы объясните это поведение и как я могу делать то, что хочу?

...