У меня есть данные за 3 года, разделенные на year
/ month
/ day
/ hour
Я хочу прочитать все файлы, выполнить некоторую обработку и записать их обратно в новый бак S3 с тем жеструктура.
Если данные поступают из s3://spark-old/production/events/2018/12/20/*
Я хочу сохранить их в s3://spark-new/production/events/2018/12/20/*
после завершения вычислений.
Эти данные были записаны в s3://spark-old/production/events/
с помощью Kinesis
и FireHose
так технически FireHose решил, что данные должны находиться в каком разделе (я думаю, основываясь на времени, которое FireHose использовал их!).
Некоторые требования:
- Я хочуодна и та же запись находится в одном и том же разделе как в новом, так и в старом S3 Bucket
- Я хочу, чтобы это задание было максимально быстрым (вероятно, его нужно запускать раз в месяц)
- И нене потребляет все мои ресурсы (у меня все еще есть другие почасовые и ежедневные задания, которые нужно было выполнить)
Я пробовал несколько способов, но ни один из них не идеален.
1- Напишите работу Spark (ежечасно), которая читает час за часом, proоцените данные и запишите их по тому же пути, с которого они считывают данные.Затем используйте Oozie
или любой другой планировщик, чтобы запланировать почасовую работу с начала данных до настоящего момента.Поскольку каждое задание не зависит от других заданий, увеличьте параллелизм координатора Oozie до 14 или 48 или ... и дождитесь завершения всех из них.Проблема в том, что каждый час не так уж и велик, большая часть работы спарк тратится впустую, чтобы создать сеанс спарка и убить его в конце.Обработка данных за 3 года занимает много времени.
2 - Вместо того, чтобы читать каждый час, попробуйте прочитать данные за день или месяц из источника 3://spark-old/production/events/
и попытаться сохранить их обратно к месту назначения.с помощью partitionBy("year", "month", "day", "hour")
.У этого подхода есть 2 проблемы
- Когда вы используете
Dataframe.partitionBy.save
, он записывает данные в путь S3 в новом формате.s3://spark-new/production/events/year=2018/month=12/day=20/*
- Так как нам нужно разделить путь
partitionBy("year", "month", "day", "hour")
, на котором мы основываемся server_received_time
(самое близкое время, которое у нас есть, когда оно используется пожарным шлангом), тем не менее, нет никакой гарантии, что данныевы читаете из раздела A для хранения в разделе B. небольшой процент данных перемещается в новый раздел.
3- Попробуйте прочитать пару часов (как пакет) в одном задании Spark(Spark session) и запишите их обратно с параллелизмом в spark.Я все еще пытаюсь выяснить, выполнимо ли это и является ли это хорошим подходом или нет.
Я хочу знать, каков наилучший способ сделать это при соблюдении всех требований.