Я хочу знать, можно ли создать DataStream типа
DataStream<Tuple4<String, Charge, List<Charge>, Table>>
с типом Table внутри кортежа, таблица взята из Table Api из Flink, i ' m пытается передать переменную AccumulatorTable , которая является таблицей флинков внутри функции процесса, чтобы вернуть кортеж следующим образом:
DataStream<Tuple4<String, Charge, List<Charge>, Table>> joinStream =
currentStreamByKeys
.connect(historicStreamByKeys)
.flatMap(new LeftJoin())
.process(new ProcessFunction<Tuple2<Charge, List<Charge>>, Tuple4<String, Charge, List<Charge>, Table>>() {
@Override
public void processElement(Tuple2<Charge, List<Charge>> value, Context ctx, Collector<Tuple4<String, Charge, List<Charge>, Table>> out) throws Exception {
out.collect(
new Tuple4<>(
KeysExtractor.getKey(keys,value.f0),
value.f0,
value.f1,
accumulatorTable
)
);
}
})
.keyBy(0);
Но я получаю эту ошибку:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
Можно ли добиться этого с помощью Flink, возможно, используя TypeHints? Заранее спасибо!