Решение проблемы с небольшими файлами в hdfs - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть сотни тысяч маленьких CSV-файлов в формате hdf.Прежде чем объединить их в один фрейм данных, мне нужно добавить идентификатор для каждого файла в отдельности (иначе при слиянии будет невозможно различить данные из разных файлов).

В настоящее время я полагаюсь на пряжу, чтобы распространять созданные мной процессы, которые добавляют идентификатор в каждый файл и преобразуют в формат паркета.Я считаю, что независимо от того, как я настраиваю кластер (по размеру / исполнителю / памяти), пропускная способность ограничена 2000-3000 файлов / ч.

for i in range(0,numBatches):
    fileSlice = fileList[i*batchSize:((i+1)*batchSize)]
    p = ThreadPool(numNodes)

    logger.info('\n\n\n --------------- \n\n\n')
    logger.info('Starting Batch : ' + str(i))
    logger.info('\n\n\n --------------- \n\n\n')
    p.map(lambda x: addIdCsv(x), fileSlice)

def addIdCsv(x):
    logId=x[logId]
    filePath=x[filePath]
    fLogRaw = spark.read.option("header", "true").option('inferSchema', 'true').csv(filePath)
    fLogRaw = fLogRaw.withColumn('id', F.lit(logId))
    fLog.write.mode('overwrite').parquet(filePath + '_added')

Вы можете видеть, что мой кластер работает хуже на процессоре.Но на YARN manager это дает 100% доступ к ресурсам.enter image description here

Как лучше всего решить эту часть конвейера данных?Что является узким местом?

Обновление 1 Задания распределены равномерно, как вы можете видеть на визуализации временной шкалы событий ниже.enter image description here

1 Ответ

0 голосов
/ 12 декабря 2018

В соответствии с предложением @cricket_007, Nifi обеспечивает хорошее простое решение этой проблемы, которое является более масштабируемым и лучше интегрируется с другими средами, чем обычный Python.Идея состоит в том, чтобы прочитать файлы в Nifi перед записью в hdfs (в моем случае они находятся в S3).Все еще существует узкое место чтения / записи на S3, но производительность составляет около 45 тыс. Файлов / ч.

Поток выглядит так.Nifi Flow

Большая часть работы выполняется в процессоре ReplaceText, который находит символ конца строки '|'и добавляет UUID и перевод строки.ReplaceText Processor

...