Использование dask для чтения данных из Hive - PullRequest
0 голосов
/ 18 октября 2018

Я использую утилиту as_pandas из impala.util для чтения данных в форме dataframe, извлеченной из улья.Однако, используя панды, я думаю, что не смогу обрабатывать большой объем данных, и это также будет медленнее.Я читал о Dask, который обеспечивает отличную функциональность для чтения больших файлов данных.Как я могу использовать его для эффективного извлечения данных из улья.

def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory.  For richer pandas- 
like functionality on distributed data sets, see the Ibis project.

Parameters
----------
cursor : `HiveServer2Cursor`
    The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
    import pandas as pd
    import dask
    import dask.dataframe as dd

    names = [metadata[0] for metadata in cursor.description]
    dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(), 
    columns=names)
    return dd.from_delayed(dfs).compute()

1 Ответ

0 голосов
/ 18 октября 2018

На данный момент нет прямого способа сделать это.Вы бы хорошо увидели реализацию dask.dataframe.read_sql_table и аналогичного кода в потребление-sql - вы, вероятно, захотите способ разделения ваших данных, и иметь каждый из вашихрабочие получают один раздел с помощью вызова delayed().dd.from_delayed и dd.concat могут затем использоваться для сшивания кусочков.

-edit-

Ваша функция имеет отложенную идею задом наперед.Вы задерживаете и немедленно материализуете данные в функции, которая работает с одним курсором - это нельзя распараллелить и сломает вашу память, если данные большие (по этой причине вы пытаетесь это сделать).

Предположим, вы можете сформировать набор из 10 запросов, где каждый запрос получает различную часть данных;делать не использовать OFFSET, использовать условие для некоторого столбца, который индексируется Hive.Вы хотите сделать что-то вроде:

queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
    cursor = impyla.execute(query)
    return pd.DataFrame.from_records(cursor.fetchall())

Теперь у вас есть функция, которая возвращает раздел и не зависит от глобальных объектов - она ​​принимает только в качестве входных данных строку.

parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)
...