Обмен данными между искровыми приложениями - PullRequest
1 голос
/ 27 мая 2020
• 1000 * Теперь я представлю проблему и решения, которые мы придумали в нашей команде:

Мы работаем с CDH 5.1x, используя HDFS в качестве нашей базы данных. Наши данные хранятся следующим образом: один каталог называется «подготовленным» для данных в потоковом процессе, который представляет собой множество небольших файлов, и один каталог называется «успешным» для «производственных» данных, которые являются теми же данными. объединены в один (или несколько) больших файлов. Данные хранятся по следующим путям:

/basePath/staged/{sensor_name}/Y/M/D/H

/basePath/success/{sensor_name}/Y/M/D/H

Кроме того, мы используем разделы hive поверх HDFS, чтобы замаскировать нашу структуру hdfs для наших клиентов.

У нас есть 3 продукта, каждый из них имеет другую часть процесса перемещения данных из «поэтапного» каталога в «успешный» каталог:

  1. Агрегация - берет данные из «поэтапного» каталога и объединяет их в один ( или несколько) файлов.
  2. HivePartitioner - добавить / удалить разделы куста в соответствии с выводом агрегации.
  3. Janitor - удаляет каталоги в соответствии с выводом агрегации.

Конвейер: Агрегация -> HivePartitioner -> Дворник.

Мы используем Airflow для планирования наших конвейеров с помощью дагов.

Итак, теперь моя проблема в том, как взаимодействовать между продуктами. Агрегация должна выводить результаты на стандартный вывод, HivePartitioner и Janitor должны получать результат в качестве входных данных.

Вот что мы пробовали / думали:

Вариант 1: Сделайте метку в воздушном потоке, которая считывает стандартный вывод агрегации и передает данные с помощью xcom (способ связи воздушного потока между задачами). Это хорошая идея, но проблема в том, что воздушный поток уже обрабатывает стандартный вывод для своих журналов, поэтому у меня нет доступа к выходным данным.

Вариант 2: Сделайте следующее Даг в воздушном потоке: Агрегация -> GetYarnLog -> HivePartitioner -> Дворник.

Агрегация выводит свой вывод на стандартный вывод, который записывается в файл журнала в кластере и доступен через пряжу api. Извлеките журнал, обработайте вывод и отправьте его HivePArtitioner и Janitor.

Это все, о чем мы думали, не вдаваясь в отдельные процессы и не создавая собственного воздушного потока.

Мне бы хотелось, чтобы вы мысли о наших вариантах, и если вы, ребята, знаете лучший способ достижения связи между приложениями Spark. Важно то, что все 3 продукта должны быть автономными и не зависеть от других (разделение), также ввод для каждого приложения должен быть передан в основной метод.

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...