Я пытаюсь написать собственный получатель для Structured Streaming
, который будет принимать сообщения от RabbitMQ
.Spark
недавно выпущенный API DataSource V2, который выглядит очень перспективным.Поскольку он абстрагирует многие детали, я хочу использовать этот API для простоты и производительности.Однако, поскольку он довольно новый, источников не так много.Мне нужны разъяснения от опытных Spark
ребят, так как они легче поймут ключевые моменты.Вот и мы:
Моя отправная точка - серия постов в блоге, первая часть которой здесь .Он показывает, как реализовать источник данных, без возможности потоковой передачи.Чтобы создать потоковый источник, я немного изменил их, так как мне нужно реализовать MicroBatchReadSupport вместо (или в дополнение к) DataSourceV2 .
Чтобы быть эффективным, этоцелесообразно иметь несколько искровых исполнителей, потребляющих RabbitMQ
одновременно, то есть из одной и той же очереди.Если меня не смущает, каждый раздел ввода - в терминологии Spark
- соответствует потребителю из очереди - в терминологии RabbitMQ
.Таким образом, нам нужно иметь несколько разделов для входного потока, верно?
Подобно части 4 серии , я реализовал MicroBatchReader следующим образом:
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}
Я возвращаю список фабрик и надеюсь, что каждый экземпляр в списке будет использован для создания считывателя, который также будет потребителем.Правильный ли это подход?
Я хочу, чтобы мой приемник был надежным, т.е. после каждого обработанного сообщения (или, по крайней мере, записи в каталог chekpoint для дальнейшей обработки) мне нужно вернуть его обратно к RabbitMQ
.Проблема начинается после того, как здесь: эти фабрики создаются в драйвере, и фактический процесс чтения происходит у исполнителей через DataReader s.Однако метод commit является частью MicroBatchReader
, а не DataReader
.Так как у меня много DataReader
с на MicroBatchReader
, как мне вернуть эти сообщения обратно к RabbitMQ
?Или я должен подтверждать, когда метод next вызывается на DataReader
?Это безопасно?Если да, то для чего тогда функция commit
?
УТОЧНЕНИЕ: ОБФУСКАЦИЯ: Ссылка, приведенная в ответе опереименование некоторых классов / функций (в дополнение к объяснениям там) сделало все намного более понятным хуже, чем когда-либо .Цитирование с там :
Переименования:
...
InputPartition
Цель состоит в том, чтобы управлять жизненным циклом связанного считывателя, который теперь называетсяInputPartitionReader
, с явной операцией создания, чтобы отразить операцию закрытия.Это больше не было понятно из API, потому что DataReaderFactory
оказалось более общим, чем есть, и неясно, почему их набор создается для чтения.
EDIT: Тем не менее, документы ясно говорят, что «фабрика считывателей будет сериализована и отправлена исполнителям, тогда считыватель данных будет создан для исполнителей и будет выполнять фактическое чтение».
Чтобы сделать потребителя надежным, я должен ACK для конкретного сообщения только после того, как оно будет передано на стороне Spark. Обратите внимание, что сообщения должны быть подтверждены тем же соединением, через которое они были доставлены, но функция фиксации вызывается на узле драйвера.Как я могу зафиксировать на узле рабочий / исполнитель?