У меня есть Dask Series Pandas DataFrames. Я хотел бы использовать dask.dataframe.multi.concat
для преобразования в Dask DataFrame. Однако dask.dataframe.multi.concat
всегда требует список DataFrames.
Я мог бы выполнить compute
для серии Dask Pandas DataFrames, чтобы получить серию Pandas DataFrames, после чего я мог бы превратить это в список,Но я думаю, что было бы лучше не звонить compute
, а вместо этого напрямую приобретать Dask DataFrame из серии Dask Pandas DataFrames.
Каков наилучший способ сделать это? Вот мой код, который производит серию фреймов данных
import pandas as pd
import dask.dataframe as dd
import operator
import numpy as np
import math
import itertools
def apportion_pcts(pcts, total):
"""Apportion an integer by percentages
Uses the largest remainder method
"""
if (sum(pcts) != 100):
raise ValueError('Percentages must add up to 100')
proportions = [total * (pct / 100) for pct in pcts]
apportions = [math.floor(p) for p in proportions]
remainder = total - sum(apportions)
remainders = [(i, p - math.floor(p)) for (i, p) in enumerate(proportions)]
remainders.sort(key=operator.itemgetter(1), reverse=True)
for (i, _) in itertools.cycle(remainders):
if remainder == 0:
break
else:
apportions[i] += 1
remainder -= 1
return apportions
# images_df = dd.read_csv('./tests/data/classification/images.csv')
images_df = pd.DataFrame({"image_id": [0,1,2,3,4,5], "image_class_id": [0,1,1,3,3,5]})
images_df = dd.from_pandas(images_df, npartitions=1)
output_ratio = [80, 20]
def partition_class (partition):
size = len(partition)
proportions = apportion_pcts(output_ratio, size)
slices = []
start = 0
for proportion in proportions:
s = slice(start, start + proportion)
slices.append(partition.iloc[s, :])
start = start+proportion
slicess = pd.Series(slices)
return slicess
partitioned_schema = dd.utils.make_meta(
[(0, object), (1, object)], pd.Index([], name='image_class_id'))
partitioned_df = images_df.groupby('image_class_id')
partitioned_df = partitioned_df.apply(partition_class, meta=partitioned_schema)
В partitioned_df
мы можем получить partitioned_df[0]
или partitioned_df[1]
, чтобы получить серию объектов фреймов данных.
Вот пример CSV-файла:
image_id,image_width,image_height,image_path,image_class_id
0,224,224,tmp/data/image_matrices/0.npy,5
1,224,224,tmp/data/image_matrices/1.npy,0
2,224,224,tmp/data/image_matrices/2.npy,4
3,224,224,tmp/data/image_matrices/3.npy,1
4,224,224,tmp/data/image_matrices/4.npy,9
5,224,224,tmp/data/image_matrices/5.npy,2
6,224,224,tmp/data/image_matrices/6.npy,1
7,224,224,tmp/data/image_matrices/7.npy,3
8,224,224,tmp/data/image_matrices/8.npy,1
9,224,224,tmp/data/image_matrices/9.npy,4
Я попытался впоследствии сделать сокращение, но это не совсем имеет смысла из-за строки прокси foo
.
def zip_partitions(s):
r = []
for c in s.columns:
l = s[c].tolist()
r.append(pd.concat(l))
return pd.Series(r)
output_df = partitioned_df.reduction(
chunk=zip_partitions
)
Список прокси, который я пытаюсь составить, - ['foo', 'foo']
. Для чего этот этап? Чтобы узнать, как выполнить задачу? Но тогда определенные операции не работают. Мне интересно, это потому, что я оперирую объектами, получаю эти строки.