Я читаю во многих файлах, составленных JSON объектов - 1 на строку. Существует много «типов» сообщений, и я хочу выбрать несколько для чтения в несколько кадров данных. Кадры данных могут иметь разные структуры в зависимости от сообщения «Тип».
Когда я запускаю приведенный ниже код, кажется, что Dask делает несколько проходов по файлам, основываясь на числе, отображаемом на приборной панели. Например, если в пути есть 1000 файлов журналов, панель инструментов Dask покажет, что для запуска необходимо выполнить 3000 шагов. Есть ли какие-то изменения, которые необходимо сделать за один проход?
import dask.bag as db
import dask.dataframe as dd
import json
json_msgs = db.read_text(f'{FILE_PATH}/log*').map(json.loads)
# functions for filter
def is_type1(j):
return ((j['TYPE']==1))
def is_type2(j):
return ((j['TYPE']==2))
def is_type3(j):
return ((j['TYPE']==3))
# functions for map
def get_type1(j):
return (j['TIME'],j['USER'],j['INFO_1'])
def get_type2(j):
return (j['TIME'],j['USER'],j['INFO_2'])
def get_type3(j):
return (j['TIME'],j['USER'],j['INFO_3a'],j['INFO_3b'])
type1_msgs = json_msgs.filter(is_type1).map(get_type1).to_dataframe(columns=['TIME','USER','INFO1'])
type2_msgs = json_msgs.filter(is_type2).map(get_type2).to_dataframe(columns=['TIME','USER','INFO2'])
type3_msgs = json_msgs.filter(is_type3).map(get_type3).to_dataframe(columns=['TIME','USER','INFO3A','INFO3B'])
type1_msgs,type2_msgs,type3_msgs = dd.compute(type1_msgs,type2_msgs,type3_msgs)