Dask производительность на распределенных работников - PullRequest
0 голосов
/ 29 ноября 2018

Я пытаюсь решить, дают ли Spark или Dask лучшую производительность для той работы, которую мы делаем.У меня есть простой скрипт, который запускает некоторые операции над DataFrame.

Я не уверен, что использую правильную распределенную версию, поскольку время намного медленнее, чем при использовании dask локально.Вот мой сценарий:

 def CreateTransactionFile(inputFile, client):
     startTime = time.time()
     df = dd.read_csv(inputFile)

     mock = pd.DataFrame([[1,1, datetime.today(), 1,1]], columns=['A', 'B', 'C', 'D', 'E'])

     outDf = df.map_partitions(CreateTransactionFile_Partition, meta=mock)
     outDf.compute()
     print(str(time.time() - startTime))


 if __name__ == '__main__':
     client = Client('10.184.62.61:8786')
     hdfs = 'hdfs://dir/python/test/bigger.csv'
     CreateTransactionFile(hdfs , client)

CreateTransactionFile_Partition работает с использованием Pandas и Numpy на предоставленном таймфрейме и возвращает в результате фрейм данных.

Должен ли я использовать что-то отличное от compute?Приведенный выше код в два раза медленнее (230 с против 550 с) в CSV-строке 700M (~ 30 ГБ), чем при работе на локальном компьютере.Локальный тест использует локальный файл, где мультиработник использует HDFS.

1 Ответ

0 голосов
/ 29 ноября 2018

outDf.compute()

Что здесь происходит: рабочие загружают и обрабатывают разделы данных, а затем результаты копируются на клиент и объединяются в один,фрейм данных в памяти.Это копирование требует потенциально дорогого межпроцессного взаимодействия.Это может быть тем, что вы хотите, если обработка агрегирующая, а выход небольшой.

Однако, если выходные данные велики, вы хотите выполнить обработку на рабочих, используя API-интерфейс dataframe без .compute(), возможно, записав выходные данные в файлы, например, .to_parquet().

...