AttributeError: у объекта 'generator' нет атрибута 'to_sql' при создании кадра данных с использованием генератора - PullRequest
0 голосов
/ 01 мая 2018

Я пытаюсь создать datafrmae из файла с фиксированной пропускной способностью и загрузить в базу данных postgresql. Мой входной файл очень большой (~ 16 ГБ) и 20 миллионов записей. Поэтому, если я создаю фрейм данных, он потребляет большую часть доступной оперативной памяти. Это займет много времени, чтобы завершить. Поэтому я подумал об использовании опции chunksize (используя генератор python) и фиксировать записи в таблицу. Но это происходит с ошибкой 'AttributeError: 'generator' object has no attribute 'to_sql'.

Вдохновленный этим ответом здесь https://stackoverflow.com/a/47257676/2799214

входной файл: test_file.txt

XOXOXOXOXOXO9
AOAOAOAOAOAO8
BOBOBOBOBOBO7
COCOCOCOCOCO6
DODODODODODO5
EOEOEOEOEOEO4
FOFOFOFOFOFO3
GOGOGOGOGOGO2
HOHOHOHOHOHO1

sample.py

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    for chunk in pd.read_fwf(filename, colspecs=[[0,12],[12,13]],index_col=False,header=None, iterator=True, chunksize=chunk_size):
        yield (chunk)

def _generator( engine, filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
    yield row

if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://ABCD:ABCD@ip:port/database')
    c = engine.connect()
    conn = c.connection
    generator = _generator(engine=engine, filename=filename)
    while True:
       print(next(generator))
    conn.close()

Ошибка:

    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
AttributeError: 'generator' object has no attribute 'to_sql'

Моя основная цель - повысить производительность. Пожалуйста, помогите мне решить проблему или предложите лучший подход. Заранее спасибо.

Ответы [ 3 ]

0 голосов
/ 01 мая 2018

'chunck_generator' вернет объект 'генератора', не являющийся фактическим элементом чанка. Вам нужно выполнить итерацию объекта, чтобы извлечь из него кусок.

>>> def my_generator(x):
...     for y in range(x):
...         yield y
...
>>> g = my_generator(10)
>>> print g.__class__
<type 'generator'>
>>> ele = next(g, None)
>>> print ele
0
>>> ele = next(g, None)
>>> print ele
1

Так что, чтобы исправить свой код, вам просто нужно зациклить генератор

for chunk in chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    yeild chunk.to_sql()

Но это кажется судорожным. Я просто должен сделать это:

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def sql_generator(engine, filename, header=False,chunk_size = 10 ** 5):
    frame = pd.read_fwf(
        filename, 
        colspecs=[[0,12],[12,13]],
        index_col=False,
        header=None, 
        iterator=True, 
        chunksize=chunk_size
    ):

    for chunk in frame:
        yield chunk.to_sql(
            'sample_table', 
            engine, 
            if_exists='replace', 
            schema='sample_schema', 
            index=False
        )


if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://USEE:PWD@IP:PORT/DB')
    for sql in sql_generator(engine, filename):
        print sql
0 голосов
/ 04 мая 2018

Вывод: Метод to_sql не эффективен для загрузки больших файлов. Поэтому я использовал метод copy_from в пакете psycopg2 и использовал опцию chunksize при создании dataframe. Загружено 9,8 миллиона записей (~ 17 ГБ) с 98 столбцами каждая за 30 минут.

Я удалил исходные ссылки на мой фактический файл (например, используя файл примера в исходном сообщении).

import pandas as pd
import psycopg2
import io

def sql_generator(cur,con, filename, boundries, col_names, header=False,chunk_size = 2000000):
    frame = pd.read_fwf(filename,colspecs=boundries,index_col=False,header=None,iterator=True,chunksize=chunk_size,names=col_names)
    for chunk in frame:
        output = io.StringIO()
        chunk.to_csv(output, sep='|', quoting=3, escapechar='\\' , index=False, header=False,encoding='utf-8')
        output.seek(0)
        cur.copy_from(output, 'sample_schema.sample_table', null="",sep="|")
        yield con.commit()

if __name__ == "__main__":
    boundries = [[0,12],[12,13]]
    col_names = ['col1','col2']
    filename = r'test_file.txt'  #Refer to sample file in the original post
    con = psycopg2.connect(database='database',user='username', password='pwd', host='ip', port='port')
    cur = con.cursor()
    for sql in sql_generator(cur,con, filename, boundries, col_names):
        print(sql)
    con.close()
0 голосов
/ 01 мая 2018

Я предложил вам что-то вроде:

def _generator( engine, filename, ...):
    for chunk in pd.read_fwf(filename, ...):
        yield chunk.to_sql('sample_table', engine, ...)  # not sure about this since row was not define

for row in _generator(engine=engine, filename=filename)
    print(row)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...