Ваш основной алгоритм: «Я бы хотел, чтобы первые 10 значений df['Node']
были установлены на первое значение ndf
, следующие 10 значений - на следующее значение ndf
и т. Д.».Причина, по которой это трудно сделать в Dask, заключается в том, что он не знает, сколько строк в каждом разделе: вы читаете из CSV, а количество строк, которые вы получаете в байтах X, зависит от того, какими именно являются данные в каждой части.,Другие форматы дают вам больше информации ...
Поэтому вам, безусловно, понадобятся два прохода через данные.Вы можете поработать с индексом, выяснить деления и, возможно, провести некоторую сортировку.На мой взгляд, самое простое, что вы можете сделать, это просто измерить длину деления и получить смещение начала каждого:
lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]
и теперь использовать пользовательскую функцию задержки для работы с деталями
@dask.delayed
def add_node(part, offset, ndf):
index = pd.Series(range(offset, offset + len(part)) // 10,
index=part.index) # 10 is the repeat factor
part['Node'] = index.map(ndf)
return part
df2 = dd.from_delayed([add_node(d, off, ndf)
for d, off in zip(df.to_delayed(), offsets)])