Загрузка данных Cassandra в Dask Dataframe - PullRequest
0 голосов
/ 02 ноября 2018

Я пытаюсь загрузить данные из базы данных кассандры в кадр данных Dask. Я попытался запросить следующее безуспешно:

query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df)) 

TypeError                                 Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))

    TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'

Кто-нибудь знает простой способ загрузки данных непосредственно из Cassandra в Dask? Сначала слишком много памяти, слишком загружается в панд.

1 Ответ

0 голосов
/ 02 ноября 2018

Некоторые проблемы с вашим кодом:

  • строка df = предположительно загружает весь набор данных в память. Даска здесь не вызывается, он не играет никакой роли. Кто-то со знанием водителя Cassandra может подтвердить это.

  • list(df) создает список имен столбцов информационного кадра и удаляет все данные

  • dd.DataFrame, если вы прочитали документы , то это не так.

Что вы, вероятно, захотите сделать, это: а) создать функцию, которая возвращает один раздел данных, б) задержать эту функцию и вызвать с различными значениями разделов в) использовать dd.from_delayed для создания кадра данных dask. Например, предполагая, что в таблице есть поле partfield, которое легко имеет возможные значения 1..6 и аналогичное количество строк для каждого раздела:

@dask.delayed
def part(x):
    session = # construct Cassandra session
    q = "SELECT * FROM document_table WHERE partfield={}".format(x)
    df = man.session.execute(query)
    return dd.DataFrame(list(df)) 

parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)
...