Читайте несколько каталогов отдельно с помощью spark и запишите их обратно в той же структуре - PullRequest
0 голосов
/ 04 января 2019

У меня есть данные за 3 года, разделенные на year / month / day / hour Я хочу прочитать все файлы, выполнить некоторую обработку и записать их обратно в новый бак S3 с тем жеструктура.

Если данные поступают из s3://spark-old/production/events/2018/12/20/* Я хочу сохранить их в s3://spark-new/production/events/2018/12/20/* после завершения вычислений.

Эти данные были записаны в s3://spark-old/production/events/ с помощью Kinesis и FireHose так технически FireHose решил, что данные должны находиться в каком разделе (я думаю, основываясь на времени, которое FireHose использовал их!).

Некоторые требования:

  • Я хочуодна и та же запись находится в одном и том же разделе как в новом, так и в старом S3 Bucket
  • Я хочу, чтобы это задание было максимально быстрым (вероятно, его нужно запускать раз в месяц)
  • И нене потребляет все мои ресурсы (у меня все еще есть другие почасовые и ежедневные задания, которые нужно было выполнить)

Я пробовал несколько способов, но ни один из них не идеален.

1- Напишите работу Spark (ежечасно), которая читает час за часом, proоцените данные и запишите их по тому же пути, с которого они считывают данные.Затем используйте Oozie или любой другой планировщик, чтобы запланировать почасовую работу с начала данных до настоящего момента.Поскольку каждое задание не зависит от других заданий, увеличьте параллелизм координатора Oozie до 14 или 48 или ... и дождитесь завершения всех из них.Проблема в том, что каждый час не так уж и велик, большая часть работы спарк тратится впустую, чтобы создать сеанс спарка и убить его в конце.Обработка данных за 3 года занимает много времени.

2 - Вместо того, чтобы читать каждый час, попробуйте прочитать данные за день или месяц из источника 3://spark-old/production/events/ и попытаться сохранить их обратно к месту назначения.с помощью partitionBy("year", "month", "day", "hour").У этого подхода есть 2 проблемы

  1. Когда вы используете Dataframe.partitionBy.save, он записывает данные в путь S3 в новом формате.s3://spark-new/production/events/year=2018/month=12/day=20/*
  2. Так как нам нужно разделить путь partitionBy("year", "month", "day", "hour"), на котором мы основываемся server_received_time (самое близкое время, которое у нас есть, когда оно используется пожарным шлангом), тем не менее, нет никакой гарантии, что данныевы читаете из раздела A для хранения в разделе B. небольшой процент данных перемещается в новый раздел.

3- Попробуйте прочитать пару часов (как пакет) в одном задании Spark(Spark session) и запишите их обратно с параллелизмом в spark.Я все еще пытаюсь выяснить, выполнимо ли это и является ли это хорошим подходом или нет.

Я хочу знать, каков наилучший способ сделать это при соблюдении всех требований.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...