Dask Использование объединения SQLAlchemy в качестве таблицы для dask.dataframe.read_ sql - index_col не может сделать и pandas, и dask счастливыми - PullRequest
0 голосов
/ 16 марта 2020

У меня есть SQLAlchemy sqlalchemy. sql .selectable.Join объект, который я создал, чтобы можно было объединить несколько таблиц в рамку данных dask.

Join def:

joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)

Если я читаю подмножество в pandas direct так, это работает:

pd_df_join = pd.read_sql_query(
    join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')

Однако, если я пытаюсь сделать То же самое с dask Я нажал одну из двух ошибок, либо Pandas, либо Dask не может найти столбец, на который я ссылаюсь. Pandas:

In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
    dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
    head = pd.read_sql(q, engine, **kwargs)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
    chunksize=chunksize,
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
    parse_dates=parse_dates,
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
    frame.set_index(index_col, inplace=True)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
    raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"

Dask:

In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-16-b639079b01cd>", line 1, in <module>
    dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
    index = table.columns[index_col] if isinstance(index_col, str) else index_col
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
    return self._data[key]
KeyError: 'COL1'

Я не уверен, есть ли способ обойти это, или я делаю что-то неправильно. Любая помощь очень ценится!

1 Ответ

0 голосов
/ 16 марта 2020

Ваши входные данные должны быть просто выражением sql, не компилировать и не смешивать выражение и строки.

В этом случае оно, вероятно, выглядит как

pd_df_join = dd.read_sql_table(join, engine_uri, index_col=TABLE1.c.COL1)
...