Dask объединяет серию кадров - PullRequest
1 голос
/ 22 октября 2019

У меня есть Dask Series Pandas DataFrames. Я хотел бы использовать dask.dataframe.multi.concat для преобразования в Dask DataFrame. Однако dask.dataframe.multi.concat всегда требует список DataFrames.

Я мог бы выполнить compute для серии Dask Pandas DataFrames, чтобы получить серию Pandas DataFrames, после чего я мог бы превратить это в список,Но я думаю, что было бы лучше не звонить compute, а вместо этого напрямую приобретать Dask DataFrame из серии Dask Pandas DataFrames.

Каков наилучший способ сделать это? Вот мой код, который производит серию фреймов данных

import pandas as pd
import dask.dataframe as dd
import operator
import numpy as np
import math
import itertools

def apportion_pcts(pcts, total):
    """Apportion an integer by percentages
    Uses the largest remainder method
    """
    if (sum(pcts) != 100):
        raise ValueError('Percentages must add up to 100')
    proportions = [total * (pct / 100) for pct in pcts]
    apportions = [math.floor(p) for p in proportions]
    remainder = total - sum(apportions)
    remainders = [(i, p - math.floor(p)) for (i, p) in enumerate(proportions)]
    remainders.sort(key=operator.itemgetter(1), reverse=True)
    for (i, _) in itertools.cycle(remainders):
        if remainder == 0:
            break
        else:
            apportions[i] += 1
            remainder -= 1
    return apportions


# images_df = dd.read_csv('./tests/data/classification/images.csv')
images_df = pd.DataFrame({"image_id": [0,1,2,3,4,5], "image_class_id": [0,1,1,3,3,5]})
images_df = dd.from_pandas(images_df, npartitions=1)

output_ratio = [80, 20]

def partition_class (partition):
    size = len(partition)
    proportions = apportion_pcts(output_ratio, size)
    slices = []
    start = 0
    for proportion in proportions:
        s = slice(start, start + proportion)
        slices.append(partition.iloc[s, :])
        start = start+proportion
    slicess = pd.Series(slices)
    return slicess

partitioned_schema = dd.utils.make_meta(
    [(0, object), (1, object)], pd.Index([], name='image_class_id'))
partitioned_df = images_df.groupby('image_class_id')
partitioned_df = partitioned_df.apply(partition_class, meta=partitioned_schema)

В partitioned_df мы можем получить partitioned_df[0] или partitioned_df[1], чтобы получить серию объектов фреймов данных.


Вот пример CSV-файла:

image_id,image_width,image_height,image_path,image_class_id
0,224,224,tmp/data/image_matrices/0.npy,5
1,224,224,tmp/data/image_matrices/1.npy,0
2,224,224,tmp/data/image_matrices/2.npy,4
3,224,224,tmp/data/image_matrices/3.npy,1
4,224,224,tmp/data/image_matrices/4.npy,9
5,224,224,tmp/data/image_matrices/5.npy,2
6,224,224,tmp/data/image_matrices/6.npy,1
7,224,224,tmp/data/image_matrices/7.npy,3
8,224,224,tmp/data/image_matrices/8.npy,1
9,224,224,tmp/data/image_matrices/9.npy,4

Я попытался впоследствии сделать сокращение, но это не совсем имеет смысла из-за строки прокси foo.

def zip_partitions(s):
    r = []
    for c in s.columns:
        l = s[c].tolist()
        r.append(pd.concat(l))
    return pd.Series(r)

output_df = partitioned_df.reduction(
    chunk=zip_partitions
)

Список прокси, который я пытаюсь составить, - ['foo', 'foo']. Для чего этот этап? Чтобы узнать, как выполнить задачу? Но тогда определенные операции не работают. Мне интересно, это потому, что я оперирую объектами, получаю эти строки.

1 Ответ

0 голосов
/ 24 октября 2019

Я нашел ответ, применив сокращение в самом конце, чтобы «сжать» каждый кадр данных в ряд данных.

import pandas as pd
import dask.dataframe as dd
import operator
import numpy as np
import math
import itertools


def apportion_pcts(pcts, total):
    """Apportion an integer by percentages
    Uses the largest remainder method
    """
    if (sum(pcts) != 100):
        raise ValueError('Percentages must add up to 100')
    proportions = [total * (pct / 100) for pct in pcts]
    apportions = [math.floor(p) for p in proportions]
    remainder = total - sum(apportions)
    remainders = [(i, p - math.floor(p)) for (i, p) in enumerate(proportions)]
    remainders.sort(key=operator.itemgetter(1), reverse=True)
    for (i, _) in itertools.cycle(remainders):
        if remainder == 0:
            break
        else:
            apportions[i] += 1
            remainder -= 1
    return apportions


images_df = dd.read_csv('./tests/data/classification/images.csv', blocksize=1024)

output_ratio = [80, 20]


def partition_class(group_df, ratio):
    proportions = apportion_pcts(ratio, len(group_df))
    partitions = []
    start = 0
    for proportion in proportions:
        s = slice(start, start + proportion)
        partitions.append(group_df.iloc[s, :])
        start += proportion
    return pd.Series(partitions)


partitioned_schema = dd.utils.make_meta(
    [(i, object) for i in range(len(output_ratio))],
    pd.Index([], name='image_class_id'))

partitioned_df = images_df.groupby('image_class_id')
partitioned_df = partitioned_df.apply(
    partition_class, meta=partitioned_schema, ratio=output_ratio)


def zip_partitions(partitions_df):
    partitions = []
    for i in partitions_df.columns:
        partitions.append(pd.concat(partitions_df[i].tolist()))
    return pd.Series(partitions)


zipped_schema = dd.utils.make_meta((None, object))

partitioned_ds = partitioned_df.reduction(
    chunk=zip_partitions, meta=zipped_schema)

Я думаю, что должно быть возможно объединить и сокращение, иприменить к одной пользовательской агрегации для представления операции уменьшения карты.

Однако я не мог понять, как сделать такую ​​вещь с пользовательской агрегацией, так как она использует групповую последовательность.

Visualized Graph

...