В нашей системе у нас классически есть два компонента: кластер Cloudera Hadoop (CDH) и «бэкэнд» система OpenShift.В HDFS у нас есть несколько огромных файлов .parquet.
Теперь у нас есть бизнес-требование «экспортировать данные по заданному пользователем критерию фильтра» пользователю в «реальном времени» в виде загружаемого файла.Итак, поток таков: пользователь вводит строку фильтра, подобную SQL, например user='Theo' and command='execution'
.Затем он отправляет запрос GET /export
в нашу серверную службу со строкой фильтра в качестве параметра.Теперь пользователь должен получить «файл загрузки» из своего веб-браузера и немедленно начать загрузку этого файла в формате CSV (даже если его размер составляет несколько терабайт или даже петабайт, это выбор пользователя, если он хочет попробовать и подождать так долго).Фактически, кластер должен отвечать синхронно , но не кэшировать весь ответ на одном узле перед отправкой результата, а только получать данные со «скоростью интернета» пользователя и напрямую передавать их пользователю.(С буфером, например, 10 или 100 МБ).
Теперь я сталкиваюсь с проблемой наилучшего подхода к этому требованию.Мои соображения:
Я хотел использовать Spark для этого.Spark считывает файл Parquet, легко применяет фильтр и затем «объединяет» отфильтрованный результат с драйвером, который, в свою очередь, передает данные обратно запрашивающему бэкэнду / клиенту.Во время этой задачи драйвер, конечно, не должен исчерпывать память, если данные отправляются слишком медленно обратно бэкэнду / пользователю, а просто должен исполнители доставлять данные с той же скоростью, с какой они «потребляются»).
Однако здесь я сталкиваюсь с некоторыми проблемами:
- Стандартный вариант использования состоит в том, что у пользователя есть мелкозернистые фильтры, так что его экспортированный файл содержит что-то вроде 1000 строк.Если бы я отправлял новое искровое задание через
spark-submit
для каждого запроса, я уже сталкивался с задержками в несколько секунд из-за инициализации и создания плана запроса (даже если это так же просто, как чтение и фильтрация данных).Я бы хотел этого избежать. - Кластер и серверная часть строго изолированы.В идеале, парни из операционной системы вообще не хотят, чтобы мы достигли кластера из бэкэнда, но кластер должен просто вызвать бэкэнд.Мы можем «открыть», может быть, один порт, но, возможно, мы не сможем утверждать что-то вроде «наш сервер будет работать с драйвером искры, но будет подключен к кластеру как сервер выполнения».
- Является ли это «неприятным запахом при проектировании», если мы запускаем «задание запуска сервера», т.е. мы отправляем приложение с режимом «клиент» на мастер кластера, который также открывает порт для HTTP-запросов и запускает толькоspark pipe по запросам, но постоянно ли поддерживает контекст spark (и доступен ли он из нашего бэкэнда через фиксированный URL)?Я знаю, что есть проект "spark-job-server", который делает это, но он все еще чувствует себя немного странно из-за природы Spark и Jobs, где "естественно" работа будет заключаться в загрузке файла, а не в 24 часазапущенный сервер, ожидающий время от времени выполнения некоторых шагов конвейера.
- Я понятия не имею, как ограничить выборку результатов искр, чтобы исполнители отправляли со скоростью, чтобы драйвер не исчерпал память, еслипользователь запросил петабайты. Есть какие-нибудь предложения по этому поводу?
Является ли Spark хорошим выбором для этой задачи, или у вас есть предложения по улучшению инструментов здесь?(В лучшем случае в среде CDH 5.14, поскольку мы не заставляем операционную группу устанавливать какой-либо дополнительный инструмент).