У меня есть сотни тысяч маленьких CSV-файлов в формате hdf.Прежде чем объединить их в один фрейм данных, мне нужно добавить идентификатор для каждого файла в отдельности (иначе при слиянии будет невозможно различить данные из разных файлов).
В настоящее время я полагаюсь на пряжу, чтобы распространять созданные мной процессы, которые добавляют идентификатор в каждый файл и преобразуют в формат паркета.Я считаю, что независимо от того, как я настраиваю кластер (по размеру / исполнителю / памяти), пропускная способность ограничена 2000-3000 файлов / ч.
for i in range(0,numBatches):
fileSlice = fileList[i*batchSize:((i+1)*batchSize)]
p = ThreadPool(numNodes)
logger.info('\n\n\n --------------- \n\n\n')
logger.info('Starting Batch : ' + str(i))
logger.info('\n\n\n --------------- \n\n\n')
p.map(lambda x: addIdCsv(x), fileSlice)
def addIdCsv(x):
logId=x[logId]
filePath=x[filePath]
fLogRaw = spark.read.option("header", "true").option('inferSchema', 'true').csv(filePath)
fLogRaw = fLogRaw.withColumn('id', F.lit(logId))
fLog.write.mode('overwrite').parquet(filePath + '_added')
Вы можете видеть, что мой кластер работает хуже на процессоре.Но на YARN manager это дает 100% доступ к ресурсам.
Как лучше всего решить эту часть конвейера данных?Что является узким местом?
Обновление 1 Задания распределены равномерно, как вы можете видеть на визуализации временной шкалы событий ниже.