Импорт данных в MySQL параллельно с пандами и GNU Parallel - PullRequest
0 голосов
/ 13 января 2019

У меня есть несколько тысяч каталогов, из которых я хочу импортировать данные в MySQL. Я создал скрипт Python, который читает данные из одного каталога и помещает их в базу данных. Вот часть, где данные были отправлены в базу данных:

host = 'localhost'
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
    conn.execute('delete from tests where ml="%s"' % ml)
    tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
    data.to_sql(con=conn, name='data', if_exists='append', index=False)
    trans.commit()
    print(CGRE + ml + ': OK' + CEND)
except:
    trans.rollback()
    print(CRED + ml + ': database error!' + CEND)
    raise
conn.close()

Однопотоковое выполнение работает хорошо, но слишком медленно:

parallel -j 1 "[[ -d {} ]] && (cd {} && data_to_db.py) || echo {} >> ~/Data/failed_db" ::: *

Теперь я хочу запустить несколько процессов:

parallel -j 8 .........

Иногда во время выполнения я получаю эту ошибку:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1213, «Обнаружен тупик при попытке получить блокировку; попробуйте перезапустить транзакцию»)

Есть ли способ увеличить время ожидания транзакции или решить ее другим способом, потому что без параллельного выполнения потребуется слишком много времени для импорта всех данных?

1 Ответ

0 голосов
/ 14 января 2019

МНОГИЕ благодаря @RomanPerekhrest, вот рабочее решение из руководства MySQL с использованием LOCK/UNLOCK TABLES.

engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
    conn.execute('set autocommit=0')
    conn.execute('lock tables tests write, data write')
    conn.execute('delete from tests where ml="%s"' % ml)
    tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
    data.to_sql(con=conn, name='data', if_exists='append', index=False)
    trans.commit()
    conn.execute('unlock tables')
    print(CGRE + ml + ': OK' + CEND)
except:
    trans.rollback()
    conn.execute('unlock tables')
    conn.close()
    print(CRED + ml + ': database error!' + CEND)
    raise
conn.close()
...