Flink и RabbitMQ, не тянет старые сообщения - PullRequest
0 голосов
/ 12 января 2020

Я использую Flink с RMQ для сопоставления сообщений, полученных через нашего агента, код очень прост:

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("source.source.com")
                .setPort(5672).setUserName("user").setPassword("password").setVirtualHost("vhost").build();

        final DataStream<String> stream = env.addSource(new RMQSource<String>(connectionConfig, // config for the
                                                                                                // RabbitMQ connection
                "NWIN", // name of the RabbitMQ queue to consume
                true, // use correlation ids; can be false if only at-least-once is required
                new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
                .setParallelism(1);

Теперь проблема в том, что если в очереди уже есть сообщения, которых нет в очереди, их нет вытащил, Flink вытягивает только новые сообщения, я не уверен, что это что-то по дизайну или что-то, что мне нужно настроить, но это проблема.

если соединение между Flink и RMQ разорвано, агенты отправляют данные в RMQ и когда соединение восстановлено, flink находит около 2-3 М сообщений для обработки и не обрабатывает их, что мне делать?!

...