Dask запускает несколько фильтров на сумке параллельно? - PullRequest
0 голосов
/ 14 апреля 2020

Я читаю во многих файлах, составленных JSON объектов - 1 на строку. Существует много «типов» сообщений, и я хочу выбрать несколько для чтения в несколько кадров данных. Кадры данных могут иметь разные структуры в зависимости от сообщения «Тип».

Когда я запускаю приведенный ниже код, кажется, что Dask делает несколько проходов по файлам, основываясь на числе, отображаемом на приборной панели. Например, если в пути есть 1000 файлов журналов, панель инструментов Dask покажет, что для запуска необходимо выполнить 3000 шагов. Есть ли какие-то изменения, которые необходимо сделать за один проход?

import dask.bag as db
import dask.dataframe as dd
import json

json_msgs = db.read_text(f'{FILE_PATH}/log*').map(json.loads)

# functions for filter
def is_type1(j):
  return ((j['TYPE']==1))
def is_type2(j):
  return ((j['TYPE']==2))
def is_type3(j):
  return ((j['TYPE']==3))


# functions for map
def get_type1(j):
  return (j['TIME'],j['USER'],j['INFO_1'])
def get_type2(j):
  return (j['TIME'],j['USER'],j['INFO_2'])
def get_type3(j):
  return (j['TIME'],j['USER'],j['INFO_3a'],j['INFO_3b'])


type1_msgs = json_msgs.filter(is_type1).map(get_type1).to_dataframe(columns=['TIME','USER','INFO1'])
type2_msgs = json_msgs.filter(is_type2).map(get_type2).to_dataframe(columns=['TIME','USER','INFO2'])
type3_msgs = json_msgs.filter(is_type3).map(get_type3).to_dataframe(columns=['TIME','USER','INFO3A','INFO3B'])

type1_msgs,type2_msgs,type3_msgs = dd.compute(type1_msgs,type2_msgs,type3_msgs)

1 Ответ

0 голосов
/ 14 апреля 2020

Dask создаст одну или несколько задач для чтения каждого файла, в зависимости от размера файла и размера блока, выбранного вами для read_text. Дальнейшие задачи будут представлять собой следующую обработку, которую вы хотите выполнить: отфильтровать, отобразить и преобразовать. Иногда Dask может плавить вместе последовательных задач, но в целом неудивительно, что у вас есть более одной задачи на входной файл.

...