Я хотел бы иметь параллельный источник Flink, который потребляет из очереди блокировки в памяти.Моя идея состоит в том, чтобы приложение помещало элементы в эту очередь, а конвейер Flink потребляет и обрабатывает их.
Каков наилучший шаблон для этого?Я рассмотрел некоторые реализации исходных кодов Flink (например, Kafka, RabbitMQ и т. Д.), И все они инициализируют необходимые соединения из исходного экземпляра.Я не могу этого сделать (то есть инициализировать очередь из каждого экземпляра источника), поскольку
- каждый экземпляр экземпляра источника создает свою собственную очередь.
- нужна ссылка на очередь изза пределами Flink для добавления элементов к нему.
В настоящее время я пришел к следующему, но использование статических очередей мне не подходит.
1.Очередь, из которой каждый экземпляр источника Flink получает свои элементы.
public class TheQueue implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(TheQueue.class);
private transient static final BlockingQueue<Object> OBJECT_QUEUE = new LinkedBlockingQueue<>();
public static SerializableSupplier<Object> getObjectConsumer() {
return () -> {
return OBJECT_QUEUE.take();
}
};
}
2.Мой отрывок из трубопровода Флинк.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(10);
env.addSource(TestParallelSourceFunction.getInstance(TheQueue.getObjectConsumer()))
3.Функция источника Flink.
public class TestParallelSourceFunction<T> extends RichParallelSourceFunction<T>{
private static final Logger LOGGER = LoggerFactory.getLogger(TestParallelSourceFunction.class);
private SerializableSupplier<T> supplier;
// initialisation code
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("Starting Flink source.");
isRunning = true;
while (isRunning) {
final T t = supplier.get();
if (t != null) {
ctx.collect(t);
}
}
LOGGER.info("Stopped Flink source.");
}