как грипп sh пакетных данных утонуть в apache мигать - PullRequest
0 голосов
/ 10 апреля 2020

Я использую apache flink (v1.10.0) для вычисления сообщения RabbitMQ, получаю результат до MySQL, теперь я вычисляю так:

   consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5))
                .reduce((d1, d2) -> {
                    d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                    d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                    return d1;
                })
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // save to mysql
                    }
                });

Но теперь метод приема получить только одну строку в каждом вызове, если одна из строк в этом пакете завершилась неудачно, я не смог откатить пакетную операцию. Теперь я хочу получить пакет одного окна и выполнить прием в базу данных один раз, в случае неудачи я откат вставки и Apache Контрольная точка Флинка. Это то, что я пытаюсь сделать сейчас:

consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
                    @Override
                    public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
                        d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                        d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                        return d1;
                    }
                })
                .apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
                        ArrayList<ReportPump> employees = Lists.newArrayList(values);
                        if (employees.size() > 0) {
                            out.collect(employees);
                        }
                    }
                })
                .addSink(new SinkFunction<List<ReportPump>>() {
                    @Override
                    public void invoke(List<ReportPump> value, Context context) throws Exception {
                        PumpRealtimeHandler.invoke(value);
                    }
                });

, но функция применения дает подсказки: Cannot resolve method 'apply' in 'SingleOutputStreamOperator'. Как уменьшить его и получить список данных партии и гриппа sh в базу данных только один раз?

1 Ответ

1 голос
/ 12 апреля 2020

SingleOutputStreamOperator не имеет метода apply, потому что apply можно генерировать только после создания окон. Что вы пропустите здесь:

.windowAll(GlobalWindows.create())

между сокращением и применением, оно объединит все уменьшенные результаты в одно глобальное окно, содержащее список всех результатов уменьшения, которое вы сможете собрать. для одного списка вместо нескольких пакетов для базы данных.


Я не знаю, является ли ваш результат хорошей практикой, потому что вы потеряете параллелизм apache flink.

You следует прочитать о TableApi и приемнике JDB C, возможно, это вам поможет. (подробнее об этом здесь: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc -подключатель ).

...