У нас есть мастер-> подробный набор данных с основными данными, доступ к которым осуществляется через API, и подробные данные, хранящиеся в файлах Parquet в HDFS.
Основные данные содержат все детали, чтобы идентифицировать точный файлчто каждая строка данных, которую мы ищем, сохраняется. Когда пользователь запрашивает индекс, он получает результаты обратного индекса, обычно около 20 тысяч указателей ключей строк, распределенных по 500 файлам, и каждый ключ строки возвращает 1 или более строк;обычно такой запрос возвращает тысячи строк для одного ключа строки.
Среда: PySpark 2.3.0 с Python 3.6
Поскольку мы точно знаем, в каком файле находится каждая точка данных, яbinaryFiles
использовал для чтения отдельных файлов Parquet и передачи (имя файла, байты) картографам вместе с результатами индекса, которые передаются всем исполнителям.
# indexdf is the Pandas dataframe containing the index results, broadcast to all executors.
brdcst = sc.broadcast(indexdf)
sc.binaryFiles(",".join(pfileobj)).map(lambda x: self.processFile(x, brdcst)).collect()
Этот код работаетдовольно хорошо для небольших файлов, но когда файлы становятся больше, я либо превышаю ограничение Spark Shuffle 2 ГБ, либо превышаю ограничение максимального сериализуемого размера Pickling при возврате результатов.
Мои группы строк Parquet имеют статистику по ключу строки, поэтому еслиЯ мог бы отобразить на основе группы строк Parquet, что-то вроде binaryFiles
, но с набором данных, подобным (filename, rowgroup_stats, rowgroup_data)
(примерно), я думаю, я бы преодолел все эти проблемы и получил бы лучшее распараллеливание для загрузки.
Мысли илиновые направления, которые я должен рассмотреть?
В начале этого проекта я пытался загрузить каждый файл отдельно в Spark SQL и выполнял объединение, но это было мучительно медленно, так как я не получал распараллеливание между файлами.Я также попытался загрузить все файлы в один Spark SQL Dataframe и присоединиться, но работа заняла в 10 раз больше времени, потому что я потерял преимущество, зная, где хранятся мои данные, и вместо этого мне пришлось сканировать весь массив данных.