Каков рекомендуемый способ добавления данных (pandas фрейм данных) к существующему фрейму данных dask в хранилище паркета?
Этот тест, например, периодически прерывается:
import dask.dataframe as dd
import numpy as np
import pandas as pd
def test_dask_intermittent_error(tmp_path):
df = pd.DataFrame(np.random.randn(100, 1), columns=['A'],
index=pd.date_range('20130101', periods=100, freq='T'))
dfs = np.array_split(df, 2)
dd1 = dd.from_pandas(dfs[0], npartitions=1)
dd2 = dd.from_pandas(dfs[1], npartitions=1)
dd2.to_parquet(tmp_path)
_ = (dd1
.append(dd.read_parquet(tmp_path))
.to_parquet(tmp_path))
assert_frame_equal(df,
dd.read_parquet(tmp_path).compute())
дает
.venv/lib/python3.7/site-packages/dask/dataframe/core.py:3812: in to_parquet
return to_parquet(self, path, *args, **kwargs)
...
fastparquet.util.ParquetException: Metadata parse failed: /private/var/folders/_1/m2pd_c9d3ggckp1c1p0z3v8r0000gn/T/pytest-of-jfaleiro/pytest-138/test_dask_intermittent_error0/part.0.parquet
Мы рассматривали возможность полагаться на простое добавление и вычислять порядок после извлечения, но, похоже, это касается другой ошибки , то есть:
def test_dask_prepend_as_append(tmp_path):
df = pd.DataFrame(np.random.randn(100, 1), columns=['A'],
index=pd.date_range('20130101', periods=100, freq='T'))
dfs = np.array_split(df, 2)
dd1 = dd.from_pandas(dfs[0], npartitions=1)
dd2 = dd.from_pandas(dfs[1], npartitions=1)
dd2.to_parquet(tmp_path)
dd1.to_parquet(tmp_path, append=True)
assert_frame_equal(df,
dd.read_parquet(tmp_path).compute())
т
ValueError: Appended divisions overlapping with previous ones.