Dask Distributed: распараллеливание чтения и анализа множества отдельных файлов - PullRequest
2 голосов
/ 15 апреля 2019

Вопрос

Как использовать Dask Distributed для распараллеливания чтения каталога файлов в отдельные кадры данных, которые я затем обрабатываю с помощью пользовательской функции?Предположим, что n-файлы что-то вроде 100 000

Фон

Я новичок в Dask и не совсем понимаю, как это спросить (какие термины использовать и т. Д.)так вот картина того, что я пытаюсь выполнить:

Overview

У меня есть много маленьких, отдельных .txt файлов "книги" (например,Файлы с разделителями строк с отметкой времени и значениями атрибутов на момент отметки времени), хранящиеся в HDFS.

Параллельно я хотел бы ...

  1. Прочитайте каждый файл в DataFrame ( note : я не пытаюсь объединить все файлы в один большой df!);

  2. Для каждого DataFrame примените пользовательскую функцию (см. Ниже);а затем

  3. Объединить каждый результат (возврат из пользовательской функции) в конечный объект и сохранить его обратно в HDFS.

Похоже,почти каждый ответ, который я нахожу (когда термины, связанные с поиском в Google), касается загрузки нескольких файлов в один фрейм данных.

Что я обрабатываю, какую функцию использую

Каждый файл регистра / DataFrame:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

Функция анализа:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

def analyze(df):

  columns_with_age = ("location", "status")
  columns_without_age = ("wh_id")

  # Get the most-recent values (from the last row of the df)
  row_count = df.count()
  last_row = df.collect()[row_count-1]

  # Create an empty "final row" dictionary
  final_row = {}

  # For each column for which we want to calculate an age value ...
  for c in columns_with_age:

      # Initialize loop values
      target_value = last_row.__getitem__(c)
      final_row[c] = target_value
      timestamp_at_lookback = last_row.__getitem__("timestamp")
      look_back = 1
      different = False

      while not different:
          previous_row = df.collect()[row_count - 1 - look_back]
          if previous_row.__getitem__(c) == target_value:
              timestamp_at_lookback = previous_row.__getitem__("timestamp")
              look_back += 1

          else:
              different = True

      # At this point, a difference has been found, so calculate the age
      final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

Таким образом, данные регистра / DataFrame будут сокращены до (при условии вычислениябыл запущен в 2019-04-14):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

1 Ответ

0 голосов
/ 17 апреля 2019

Параллельная запись из многих процессов в один выходной файл на самом деле невозможна, потому что вы не знаете, как долго каждый из результатов будет заранее, поэтому вы не знаете, где в файле разместить другие результаты. Более того, HDFS действительно любит получать большие блоки непрерывных данных (возможно, 64 МБ), а не инкрементные обновления.

Есть пара вещей, которые вы можете сделать:

  • записать все свои выходные данные в отдельные файлы, а затем запустить отдельное задание для их объединения; это совершенно нормально, если обработка кадров данных велика по сравнению со временем чтения / записи
  • используйте распределенный client.submit API и as_completed для записи результатов в выходной файл вашего основного процесса. Обратите внимание, что вы могли бы сделать это с учетом первоначального порядка, если это важно, но это потребует дополнительной работы.
...