Вопрос
Как использовать Dask Distributed для распараллеливания чтения каталога файлов в отдельные кадры данных, которые я затем обрабатываю с помощью пользовательской функции?Предположим, что n-файлы что-то вроде 100 000
Фон
Я новичок в Dask и не совсем понимаю, как это спросить (какие термины использовать и т. Д.)так вот картина того, что я пытаюсь выполнить:
![Overview](https://i.stack.imgur.com/IWLHR.png)
У меня есть много маленьких, отдельных .txt файлов "книги" (например,Файлы с разделителями строк с отметкой времени и значениями атрибутов на момент отметки времени), хранящиеся в HDFS.
Параллельно я хотел бы ...
Прочитайте каждый файл в DataFrame ( note : я не пытаюсь объединить все файлы в один большой df!);
Для каждого DataFrame примените пользовательскую функцию (см. Ниже);а затем
Объединить каждый результат (возврат из пользовательской функции) в конечный объект и сохранить его обратно в HDFS.
Похоже,почти каждый ответ, который я нахожу (когда термины, связанные с поиском в Google), касается загрузки нескольких файлов в один фрейм данных.
Что я обрабатываю, какую функцию использую
Каждый файл регистра / DataFrame:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
Функция анализа:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
def analyze(df):
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
Таким образом, данные регистра / DataFrame будут сокращены до (при условии вычислениябыл запущен в 2019-04-14):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }