Dask Apply of Python Function - PullRequest
0 голосов
/ 26 июня 2018

У меня есть df:

    id  log
0   24  2*C316*first_field_name17*second_field_name16*third_field_name2*N311*field value1*Y5*hello2*O30*0*0*
1   25  2*C316*first_field_name17*second_field_name16*third_field_name2*N311*field value1*Y5*hello2*O30*0*0*

У меня есть функция, которая анализирует строку:

dfs = []

def parse_log(id, log):
    split = log.split('*')
    number_of_fields = int(split[1][1:int(split[0][0])])

    i=2
    string_length = int(split[1][int(split[0][0]):])
    field_names_list = []
    while i < number_of_fields + 2:
        field_name = split[i][0:string_length]
        field_names_list.append(field_name)
        string_length = int(split[i][string_length:])
        i+=1

    i = 3 + number_of_fields
    string_length = int(split[2 + number_of_fields][string_length:])
    new_values_list = []
    while i < 3+number_of_fields*2:
        field_name = split[i][0:string_length]
        new_values_list.append(field_name)
        string_length = int(split[i][string_length:])
        i+=1

    i = 4 + number_of_fields*2
    string_length = int(split[3 + number_of_fields*2][string_length:])
    old_values_list = []
    while i <= 3 + number_of_fields*3:
        old_value = split[i][0:string_length]
        old_values_list.append(old_value)
        if i == 3 + number_of_fields*3:
            string_length = 0
        else:
            string_length = int(split[i][string_length:])
        i+=1

    df = pd.DataFrame(
    {'id':id,
     'field': field_names_list,
     'old_value': old_values_list,
     'new_value': new_values_list
    })

dfs.append(df)  

Эта функция работает с обычными пандами:

data.apply(lambda x: parse_audit_log(x['id'], x['log']), axis=1) 

Затем я пытаюсь применить вместо dask (data в данном случае - это кадр данных dask, считанный из SQL):

out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.id,x.log),axis=1), meta=('result', int)).compute(get=get)

Это приводит к ошибке:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-2468010e0124> in <module>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

~\_installed\anaconda\lib\site-packages\dask\base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

~\_installed\anaconda\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

~\_installed\anaconda\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
    175                            get_id=_process_get_id, dumps=dumps, loads=loads,
    176                            pack_exception=pack_exception,
--> 177                            raise_exception=reraise, **kwargs)
    178     finally:
    179         if cleanup:

~\_installed\anaconda\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~\_installed\anaconda\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
     64     def reraise(exc, tb=None):
     65         if exc.__traceback__ is not tb:
---> 66             raise exc.with_traceback(tb)
     67         raise exc
     68 

~\_installed\anaconda\lib\site-packages\dask\local.py in execute_task()
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~\_installed\anaconda\lib\site-packages\dask\local.py in _execute_task()
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~\_installed\anaconda\lib\site-packages\dask\dataframe\core.py in apply_and_enforce()
   3402 
   3403     Ensures the output has the same columns, even if empty."""
-> 3404     df = func(*args, **kwargs)
   3405     if isinstance(df, (pd.DataFrame, pd.Series, pd.Index)):
   3406         if len(df) == 0:

<ipython-input-12-2468010e0124> in <lambda>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

~\_installed\anaconda\lib\site-packages\pandas\core\frame.py in apply()
   4875                         f, axis,
   4876                         reduce=reduce,
-> 4877                         ignore_failures=ignore_failures)
   4878             else:
   4879                 return self._apply_broadcast(f, axis)

~\_installed\anaconda\lib\site-packages\pandas\core\frame.py in _apply_standard()
   4971             try:
   4972                 for i, v in enumerate(series_gen):
-> 4973                     results[i] = func(v)
   4974                     keys.append(v.name)
   4975             except Exception as e:

<ipython-input-12-2468010e0124> in <lambda>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

<ipython-input-11-08a2f8f06a76> in parse_audit_log()
      1 def parse_audit_log(row_id, audit_log):
----> 2     split = audit_log.split('*')
      3     number_of_fields = int(split[1][1:int(split[0][0])])
      4 
      5     i=2

AttributeError: ("'NoneType' object has no attribute 'split'", 'occurred at index 1')

Это одна ошибка, но я столкнулся с несколькими другими при настройке функции в соответствии с требованиями dask. Чего мне не хватает в приложении dask? Мой метатег почти наверняка не прав. Хотя есть примеры применения функций dask, я не встречал функций с явным возвратом.

Обновление - желаемый вывод:

В идеале я получу df:

   row_id  field_name          new_value      old_value
0   24      first_field_name    field value 
1   24      second_field_name   Y   
2   24      third_field_name    hello
3   25      first_field_name    field value 
4   25      second_field_name   Y   
5   25      third_field_name    hello

Проблема (и причина, по которой я пытаюсь использовать dask) состоит в том, что набор данных составляет 55 миллионов записей. Так как этот процесс разбора разбивает эти записи на одну или несколько записей, мне нужно что-то эффективное, что может поместиться в моей памяти 32 ГБ.

...