Использование Dask NEW to_ sql для повышения эффективности (память / скорость) или альтернативы для получения данных из фрейма данных dask в SQL Server Table - PullRequest
6 голосов
/ 16 июня 2020

Моя конечная цель - использовать SQL / Python вместе для проекта, в котором слишком много данных для pandas (по крайней мере, на моей машине). Итак, я пошел с dask на:

  1. чтение данных из нескольких источников (в основном SQL серверные таблицы / представления)
  2. манипулирование / объединение данных в один большой таблица фрейма данных dask из ~ 10 миллионов + строк и 52 столбцов, некоторые из которых имеют несколько длинных уникальных строк
  3. записывать ее обратно на SQL сервер ежедневно, так что мой отчет PowerBI может автоматически обновляться sh данные.

Для # 1 и # 2 они занимают ~ 30 секунд, вместе взятых, с использованием минимальной памяти (несколько SQL запросов ~ 200 строк кода, управляющих большим набором данных с помощью dask). Быстро и весело !!!

Но главное узкое место - пункт 3 выше. Каковы эффективные способы (1. Память и 2. Скорость (время выполнения)) для выполнения sh # 3 с помощью dask или других альтернатив? См. Дополнительную информацию, а также то, что я пробовал, и некоторые выводы, к которым я пришел.


Для пунктов 1, 2 и 3 выше это была задача, которую я счел невыполнимой связано с pandas из-за ограничений памяти / длительного времени выполнения, но dask решил # 1 и # 2 выше с честью, но я все еще боролся с # 3 - возвращение данных в таблицу SQL автоматическим способом, когда я не отправлял в .csv, а затем импортировал на сервер SQL. Я попытался .compute() преобразовать фрейм данных dask в pandas фрейм данных, а затем записать to_sql, но это побеждает цель использования dask для чтения / модели данных и снова заканчивается нехватка памяти / требуется навсегда для выполнения в любом случае.

Итак, новый план заключался в использовании to_csv для ежедневного создания нового .csv и использования запроса для массовой вставки данных в таблицу. Я думаю, что это все еще жизнеспособное решение; но сегодня я был ОЧЕНЬ счастлив , узнав, что dask выпустил новую функцию to_sql (https://docs.dask.org/en/latest/dataframe-api.html#dask .dataframe.DataFrame.to_ sql). Используя существующие статьи / блоги StackOverflow об этом топе c (например, от Francois Leblan c - https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html), я поработал со всеми параметрами, чтобы найти наиболее эффективную комбинацию, которая имела самое быстрое время для выполнения (что ОЧЕНЬ важно, когда вы каждый день пишете большие наборы данных для отчетности). Это то, что я нашел, что похоже на множество сообщений о pd.to_sql, включая Leblan c:

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)

Использование любой комбинации следующих нестандартных параметров замедлил время выполнения для моего to_sql (опять же в соответствии с тем, что ЛеБлан c упомянул в своем блоге):

  1. chunksize=40 ( 40 - это максимум, который я мог передать для 52 столбцов на 2098 SQL ограничение параметра сервера),
  2. method='multi',
  3. parallel=True)

Примечание: я понял, что в дополнение (или вместо) передачи chunksize=40, я мог бы пройти через мои 33 раздела фрейма данных dask и обработать каждый фрагмент to_sql индивидуально. Это было бы более эффективным с точки зрения памяти и, возможно, быстрее. Один раздел занимал от 45 секунд до 1 минуты, в то время как выполнение всего фрейма данных dask за один раз занимало> 1 часа для всех разделов. Я попробую перебрать все разделы и выложить обновление, если это будет быстрее. Час кажется большим, но я чувствовал себя полностью заблокированным при попытке вычислить с pandas, что заняло всю ночь или закончилась память, так что это ШАГ ВВЕРХ. Честно говоря, я достаточно доволен этим и, вероятно, собираюсь создать .exe сейчас с pyinstaller и запускать .exe ежедневно, так что это будет полностью автоматизировано и go оттуда, но я думал, что это будет полезно для других, так как я боролся с различными решениями за последние пару недель.

1 Ответ

2 голосов
/ 18 июня 2020

Я тестировал запись фрейма данных на SQL Сервер в разделах, перебирая их в цикле, а не все сразу, и время для завершения всего было похоже на запись всего сразу.

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
# From my question, I have replaced the commented out line of code with everything below that to see if there was a significant increase in speed. There was not. It was about the same as the cod in the question.
# ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
i = 0
for i in range(ddf.npartitions):
    partition = ddf.get_partition(i)
    if i == 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
    if i > 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='append', index=False)
    i += 1
...