Java Flink Внешний источник - PullRequest
       6

Java Flink Внешний источник

0 голосов
/ 09 февраля 2019

Я хотел бы иметь параллельный источник Flink, который потребляет из очереди блокировки в памяти.Моя идея состоит в том, чтобы приложение помещало элементы в эту очередь, а конвейер Flink потребляет и обрабатывает их.

Каков наилучший шаблон для этого?Я рассмотрел некоторые реализации исходных кодов Flink (например, Kafka, RabbitMQ и т. Д.), И все они инициализируют необходимые соединения из исходного экземпляра.Я не могу этого сделать (то есть инициализировать очередь из каждого экземпляра источника), поскольку

  • каждый экземпляр экземпляра источника создает свою собственную очередь.
  • нужна ссылка на очередь изза пределами Flink для добавления элементов к нему.

В настоящее время я пришел к следующему, но использование статических очередей мне не подходит.

1.Очередь, из которой каждый экземпляр источника Flink получает свои элементы.

public class TheQueue implements Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(TheQueue.class);

    private transient static final BlockingQueue<Object> OBJECT_QUEUE = new LinkedBlockingQueue<>();

    public static SerializableSupplier<Object> getObjectConsumer() {
        return () -> {
            return OBJECT_QUEUE.take();
        }
    };
}

2.Мой отрывок из трубопровода Флинк.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(10);
env.addSource(TestParallelSourceFunction.getInstance(TheQueue.getObjectConsumer()))

3.Функция источника Flink.

public class TestParallelSourceFunction<T> extends RichParallelSourceFunction<T>{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestParallelSourceFunction.class);

    private SerializableSupplier<T> supplier;

    // initialisation code

    @Override
    public void run(final SourceContext<T> ctx) throws Exception {

        LOGGER.info("Starting Flink source.");
        isRunning = true;

        while (isRunning) {
            final T t = supplier.get();
            if (t != null) {
                ctx.collect(t);
            }
        }

        LOGGER.info("Stopped Flink source.");
    }

1 Ответ

0 голосов
/ 18 июня 2019

Я думаю, что ваше понимание систем очередей сообщений, таких как Kafka и RabbitMQ, и их роли в потоковых приложениях неверно.Это автономные сервисы, которые существуют за пределами Flink.Flink не запускает и не настраивает их, он просто открывает соединения для чтения из них.

Таким образом, идея заключается в том, что вы запускаете кластер Kafka и даете необходимые сведения о подключении и названия тем как для своей работы Flink, так и для любого приложения, которое помещает элементы в Kafka.Приложение, помещающее элементы в очередь, обращается к кластеру Kafka через tcpip, как и Flink.

...