Я использую apache flink (v1.10.0) для вычисления сообщения RabbitMQ, получаю результат до MySQL, теперь я вычисляю так:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5))
.reduce((d1, d2) -> {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
})
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// save to mysql
}
});
Но теперь метод приема получить только одну строку в каждом вызове, если одна из строк в этом пакете завершилась неудачно, я не смог откатить пакетную операцию. Теперь я хочу получить пакет одного окна и выполнить прием в базу данных один раз, в случае неудачи я откат вставки и Apache Контрольная точка Флинка. Это то, что я пытаюсь сделать сейчас:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
@Override
public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
}
})
.apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
ArrayList<ReportPump> employees = Lists.newArrayList(values);
if (employees.size() > 0) {
out.collect(employees);
}
}
})
.addSink(new SinkFunction<List<ReportPump>>() {
@Override
public void invoke(List<ReportPump> value, Context context) throws Exception {
PumpRealtimeHandler.invoke(value);
}
});
, но функция применения дает подсказки: Cannot resolve method 'apply' in 'SingleOutputStreamOperator'
. Как уменьшить его и получить список данных партии и гриппа sh в базу данных только один раз?