Сопоставление групп данных с Dask - PullRequest
1 голос
/ 16 января 2020

Вопрос и постановка задачи

У меня есть данные из двух источников. Каждый источник содержит группы, обозначенные ID столбцом, координатами и атрибутами. Я хотел бы обработать эти данные, сначала сопоставив эти группы, затем найдя ближайших соседей в этих группах, а затем изучив, как атрибуты из разных источников сравниваются между соседями. Моя задача обучения для меня заключалась в том, как обрабатывать эти данные с помощью параллельной обработки.

Вопрос: «Используя Dask для параллельной обработки, какой может быть самый простой и простой способ обработки данных такого типа? "

Предыстория и мое решение до сих пор

Данные находятся в CSV-файлах, таких как фиктивные данные ниже (реальные файлы находятся в диапазоне 100 МБ):

source1.csv:
ID,X_COORDINATE,Y_COORDINATE,ATTRIB1,PARAM1
B,-63802.84728184705,-21755.63629150563,3,36.136464492674556
B,-63254.41147034371,405.6973789009853,1,18.773534321367528
A,-9536.906537069272,32454.934987740824,0,14.043507555168809
A,15250.802157581298,-40868.390394552596,0,6.680542212635015
source2.csv:
ID,X_COORDINATE,Y_COORDINATE,ATTRIB1,PARAM1
B,-6605.150024790153,39733.35763934722,3,5.599467583303852
B,53264.28797042654,24647.24183964514,0,27.938127686688162
A,6690.836682554512,34643.0606728128,0,10.02914141165683
A,15243.16,-40954.928,0,18.130371948545935

Что я хотел бы сделать, это

  1. Загрузить данные в кадры данных
  2. Разделить их на группы по столбцу идентификаторов
  3. Для каждой группы в source1 и source2, позволяет вызывать подкадры данных в каждой группе source1_sub и source2_sub
    • , создавать объекты kdtree k1 и k2 на основе столбцов X_COORDINATE и Y_COORDINATE
  4. Для каждой пары объектов (k1, k2)
    • найти ближайших соседей для деревьев
    • построить три кадра данных:
      • matches_sub: содержащий соответствующие строки в source1_sub и source2_sub
      • source1_sub_only : строки в source1_sub, которые не соответствуют
      • source2_sub_only: строки в source2_sub, которые не соответствуют
  5. Объединить все matches_sub, source1_sub_only и source2_sub_only фреймы данных на три кадра данных: matches, source1_only, source2_only
  6. Анализ этих фреймов данных

Это проблема, которая следует красиво распараллеливать, так как каждая пара групп не зависит от других пар групп. Я решил использовать scipy.spatial.cKDTree для фактического сопоставления координат, но трудность возникает из-за того, что он работает с индексами необработанных numpy массивов, что не так просто совместимо с тем, как можно получить доступ к массивам Dask. По крайней мере, это мое понимание.

Мои первые тщетные попытки вращались вокруг очень неловко

  1. Попытка использовать два кадра данных Dask, выровнять их и найти совпадения. Это было ужасно медленно и трудно понять.
  2. Чтение данных с помощью Dask Dataframe и обработка с использованием Dask Bag. Это было немного менее сложно, но все же не было удовлетворительным.

1 Ответ

1 голос
/ 27 января 2020

Отвечая самому себе, самый простой подход, который я мог придумать, - это

  1. Считать данные из источников 1 и 2 в кадры данных df_source1 и df_source2, используя dask.dataframe.read_csv.
  2. После прочтения добавьте новый столбец SOURCE к этим кадрам данных, чтобы определить источник. Теперь интересующие меня группы указываются в столбцах ID и SOURCE. Это можно использовать для группировки.
  3. Объединить эти кадры данных в новый кадр данных df = dd.concat([df_source1, df_source2], axis=0)
  4. Группировать данные по столбцам ID и SOURCE и использовать apply для поиска совпадений. .
  5. Анализ данных.
  6. Готово.

Что-то вроде:

import dask.dataframe as dd

import pandas as pd
import numpy as np

from scipy.spatial import cKDTree

def find_matches(x):
    x_by_source = x.groupby(['SOURCE'])

    grp1 = x_by_source.get_group(1)
    grp2 = x_by_source.get_group(2)

    tree1 = cKDTree(grp1[['X_COORDINATE', 'Y_COORDINATE']])
    tree2 = cKDTree(grp2[['X_COORDINATE', 'Y_COORDINATE']])
    neighbours = tree1.query_ball_tree(tree2, r=70000)
    matches = np.array([[n,k] for (n, j) in enumerate(neighbours) if j != [] for k in j])

    indices1 = grp1.index[matches[:,0]]
    indices2 = grp2.index[matches[:,1]]

    m1 = grp1.loc[indices1]
    m2 = grp2.loc[indices2]

    # arrange matches side by side
    res = pd.concat([m1, m2], ignore_index=True, axis=1)

    return(res)

df_source1 = dd.read_csv('source1.csv').assign(SOURCE = 1)
df_source2 = dd.read_csv('source2.csv').assign(SOURCE = 2)

df = dd.concat([df_source1, df_source2], axis=0)

meta = pd.DataFrame(columns=np.arange(0, 2*len(df.columns)))

result = (df.groupby('ID')
    .apply(find_matches, meta=meta)
    .persist()
)

# Proceed with further analysis
...