Spark Структурированная потоковая передача с источником RabbitMQ - PullRequest
0 голосов
/ 04 июня 2018

Я пытаюсь написать собственный получатель для Structured Streaming, который будет принимать сообщения от RabbitMQ.Spark недавно выпущенный API DataSource V2, который выглядит очень перспективным.Поскольку он абстрагирует многие детали, я хочу использовать этот API для простоты и производительности.Однако, поскольку он довольно новый, источников не так много.Мне нужны разъяснения от опытных Spark ребят, так как они легче поймут ключевые моменты.Вот и мы:

Моя отправная точка - серия постов в блоге, первая часть которой здесь .Он показывает, как реализовать источник данных, без возможности потоковой передачи.Чтобы создать потоковый источник, я немного изменил их, так как мне нужно реализовать MicroBatchReadSupport вместо (или в дополнение к) DataSourceV2 .

Чтобы быть эффективным, этоцелесообразно иметь несколько искровых исполнителей, потребляющих RabbitMQ одновременно, то есть из одной и той же очереди.Если меня не смущает, каждый раздел ввода - в терминологии Spark - соответствует потребителю из очереди - в терминологии RabbitMQ.Таким образом, нам нужно иметь несколько разделов для входного потока, верно?

Подобно части 4 серии , я реализовал MicroBatchReader следующим образом:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}

Я возвращаю список фабрик и надеюсь, что каждый экземпляр в списке будет использован для создания считывателя, который также будет потребителем.Правильный ли это подход?

Я хочу, чтобы мой приемник был надежным, т.е. после каждого обработанного сообщения (или, по крайней мере, записи в каталог chekpoint для дальнейшей обработки) мне нужно вернуть его обратно к RabbitMQ.Проблема начинается после того, как здесь: эти фабрики создаются в драйвере, и фактический процесс чтения происходит у исполнителей через DataReader s.Однако метод commit является частью MicroBatchReader, а не DataReader.Так как у меня много DataReader с на MicroBatchReader, как мне вернуть эти сообщения обратно к RabbitMQ?Или я должен подтверждать, когда метод next вызывается на DataReader?Это безопасно?Если да, то для чего тогда функция commit?

УТОЧНЕНИЕ: ОБФУСКАЦИЯ: Ссылка, приведенная в ответе опереименование некоторых классов / функций (в дополнение к объяснениям там) сделало все намного более понятным хуже, чем когда-либо .Цитирование с там :

Переименования:

  • DataReaderFactory до InputPartition

  • *От 1073 *DataReader до InputPartitionReader

...

InputPartition Цель состоит в том, чтобы управлять жизненным циклом связанного считывателя, который теперь называетсяInputPartitionReader, с явной операцией создания, чтобы отразить операцию закрытия.Это больше не было понятно из API, потому что DataReaderFactory оказалось более общим, чем есть, и неясно, почему их набор создается для чтения.

EDIT: Тем не менее, документы ясно говорят, что «фабрика считывателей будет сериализована и отправлена ​​исполнителям, тогда считыватель данных будет создан для исполнителей и будет выполнять фактическое чтение».

Чтобы сделать потребителя надежным, я должен ACK для конкретного сообщения только после того, как оно будет передано на стороне Spark. Обратите внимание, что сообщения должны быть подтверждены тем же соединением, через которое они были доставлены, но функция фиксации вызывается на узле драйвера.Как я могу зафиксировать на узле рабочий / исполнитель?

1 Ответ

0 голосов
/ 04 июня 2018
> Я возвращаю список фабрик и надеюсь, что каждый экземпляр в списке будет использован для создания ридера, который также будет потребителем.Это правильный подход?В реализации источника source [socket] [1] один поток помещает сообщения во внутренний ListBuffer.Другими словами, есть один потребитель (поток), заполняющий внутренний ListBuffer, который ** затем ** разделяется на разделы с помощью `planInputPartitions` (` createDataReaderFactories` получил [переименованный] [2] в `planInputPartitions`).Кроме того, в соответствии с Javadoc [MicroBatchReadSupport] [3]> Механизм выполнения создаст средство чтения микропакетов в начале потокового запроса, альтернативные вызовы для setOffsetRange и createDataReaderFactories для каждого пакета для обработки, а затем вызовет stop ()когда выполнение будет завершено.Обратите внимание, что один запрос может иметь несколько выполнений из-за перезапуска или восстановления после сбоя.Другими словами, `createDataReaderFactories` следует вызывать ** многократно **, что, насколько я понимаю, предполагает, что каждый DataReader отвечает за раздел статического ввода, что означает, что DataReader не должен быть потребителем.----------> Тем не менее, метод commit является частью MicroBatchReader, а не DataReader ... Если да, то для чего тогда функция коммитов?Возможно, часть логического обоснования для функции фиксации состоит в том, чтобы предотвратить увеличение внутреннего буфера MicroBatchReader.Зафиксировав смещение, вы можете эффективно удалить из буфера элементы, меньшие, чем смещение, поскольку вы обязуетесь больше не обрабатывать их.Вы можете увидеть это в исходном коде сокета с помощью `batches.trimStart (offsetDiff)`
Я не уверен насчет реализации надежного приемника, поэтому я надеюсь, что более опытный парень из Spark придет и схватиттвой вопрос мне тоже интересен!Надеюсь это поможет!

РЕДАКТИРОВАТЬ

Я изучил только источники socket и wiki-edit .Эти источники не готовы к производству, чего не хотел искать вопрос.Вместо этого источник kafka является лучшей отправной точкой, которая, в отличие от вышеупомянутых источников, искала множество потребителей, таких как автор.

Однако, возможно, если вы ищете ненадежные источники, рассмотренные выше источники для сокетов и wikiedit предоставляют менее сложное решение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...