Параллельные SQL-запросы - PullRequest
       27

Параллельные SQL-запросы

0 голосов
/ 06 сентября 2018

Как можно выполнять запросы SQL с различными размерами столбцов параллельно, используя dask? Ниже была моя попытка:

from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()

con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")

@delayed
def loadsql(sql):
    return pd.read_sql_query(sql,con)

results = [loadsql(x) for x in sql_to_run] 

dask.compute(results)

df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]

Однако это приводит к следующей ошибке:

DatabaseError: Ошибка выполнения на sql: «SQL QUERY» ORA-01013: пользователь запросил отмену текущей операции невозможно откатить

и вскоре после этого появляется другая ошибка:

MultipleInstanceError: Создается несколько несовместимых экземпляров подкласса TerminalInteractiveShell.

sql_to_run - список различных SQL-запросов

Любые предложения или указатели? Спасибо!


Обновление 9.7.18

Думаю, это скорее из-за того, что я недостаточно читаю документацию. В самом деле, проблема заключалась в том, что con вне функции loadql. Ниже приведено изменение кода, которое сейчас работает так, как задумано.

def loadsql(sql):
    con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
    result =  pd.read_sql_query(sql,con)
    con.close()
    return result

values = [delayed(loadsql)(x) for x in sql_to_run] 
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds

1 Ответ

0 голосов
/ 07 сентября 2018

Я предполагаю, что клиент oracle не является потокобезопасным. Вместо этого вы можете попробовать запустить процессы (используя многопроцессорный или распределенный планировщик), если объект conn сериализуется - это может быть маловероятным. Скорее всего, будет работать создание соединения в пределах loadsql, чтобы оно переделывалось для каждого вызова, и, надеюсь, различные соединения не будут мешать друг другу.

...