У меня есть SQLAlchemy sqlalchemy. sql .selectable.Join объект, который я создал, чтобы можно было объединить несколько таблиц в рамку данных dask.
Join def:
joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)
Если я читаю подмножество в pandas direct так, это работает:
pd_df_join = pd.read_sql_query(
join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')
Однако, если я пытаюсь сделать То же самое с dask Я нажал одну из двух ошибок, либо Pandas, либо Dask не может найти столбец, на который я ссылаюсь. Pandas:
In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
head = pd.read_sql(q, engine, **kwargs)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
chunksize=chunksize,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
parse_dates=parse_dates,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
frame.set_index(index_col, inplace=True)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"
Dask:
In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-16-b639079b01cd>", line 1, in <module>
dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
index = table.columns[index_col] if isinstance(index_col, str) else index_col
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
return self._data[key]
KeyError: 'COL1'
Я не уверен, есть ли способ обойти это, или я делаю что-то неправильно. Любая помощь очень ценится!