• 1000 * Теперь я представлю проблему и решения, которые мы придумали в нашей команде:
Мы работаем с CDH 5.1x, используя HDFS в качестве нашей базы данных. Наши данные хранятся следующим образом: один каталог называется «подготовленным» для данных в потоковом процессе, который представляет собой множество небольших файлов, и один каталог называется «успешным» для «производственных» данных, которые являются теми же данными. объединены в один (или несколько) больших файлов. Данные хранятся по следующим путям:
/basePath/staged/{sensor_name}/Y/M/D/H
/basePath/success/{sensor_name}/Y/M/D/H
Кроме того, мы используем разделы hive поверх HDFS, чтобы замаскировать нашу структуру hdfs для наших клиентов.
У нас есть 3 продукта, каждый из них имеет другую часть процесса перемещения данных из «поэтапного» каталога в «успешный» каталог:
- Агрегация - берет данные из «поэтапного» каталога и объединяет их в один ( или несколько) файлов.
- HivePartitioner - добавить / удалить разделы куста в соответствии с выводом агрегации.
- Janitor - удаляет каталоги в соответствии с выводом агрегации.
Конвейер: Агрегация -> HivePartitioner -> Дворник.
Мы используем Airflow для планирования наших конвейеров с помощью дагов.
Итак, теперь моя проблема в том, как взаимодействовать между продуктами. Агрегация должна выводить результаты на стандартный вывод, HivePartitioner и Janitor должны получать результат в качестве входных данных.
Вот что мы пробовали / думали:
Вариант 1: Сделайте метку в воздушном потоке, которая считывает стандартный вывод агрегации и передает данные с помощью xcom (способ связи воздушного потока между задачами). Это хорошая идея, но проблема в том, что воздушный поток уже обрабатывает стандартный вывод для своих журналов, поэтому у меня нет доступа к выходным данным.
Вариант 2: Сделайте следующее Даг в воздушном потоке: Агрегация -> GetYarnLog -> HivePartitioner -> Дворник.
Агрегация выводит свой вывод на стандартный вывод, который записывается в файл журнала в кластере и доступен через пряжу api. Извлеките журнал, обработайте вывод и отправьте его HivePArtitioner и Janitor.
Это все, о чем мы думали, не вдаваясь в отдельные процессы и не создавая собственного воздушного потока.
Мне бы хотелось, чтобы вы мысли о наших вариантах, и если вы, ребята, знаете лучший способ достижения связи между приложениями Spark. Важно то, что все 3 продукта должны быть автономными и не зависеть от других (разделение), также ввод для каждого приложения должен быть передан в основной метод.
Спасибо.