Я использую Flink с RMQ для сопоставления сообщений, полученных через нашего агента, код очень прост:
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("source.source.com")
.setPort(5672).setUserName("user").setPassword("password").setVirtualHost("vhost").build();
final DataStream<String> stream = env.addSource(new RMQSource<String>(connectionConfig, // config for the
// RabbitMQ connection
"NWIN", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1);
Теперь проблема в том, что если в очереди уже есть сообщения, которых нет в очереди, их нет вытащил, Flink вытягивает только новые сообщения, я не уверен, что это что-то по дизайну или что-то, что мне нужно настроить, но это проблема.
если соединение между Flink и RMQ разорвано, агенты отправляют данные в RMQ и когда соединение восстановлено, flink находит около 2-3 М сообщений для обработки и не обрабатывает их, что мне делать?!