Python sqlite3 и параллелизм - PullRequest
       82

Python sqlite3 и параллелизм

75 голосов
/ 26 декабря 2008

У меня есть программа на Python, которая использует модуль "Threading". Раз в секунду моя программа запускает новый поток, который извлекает некоторые данные из Интернета и сохраняет эти данные на моем жестком диске. Я хотел бы использовать sqlite3 для хранения этих результатов, но я не могу заставить его работать. Кажется, проблема заключается в следующей строке:

conn = sqlite3.connect("mydatabase.db")
  • Если я помещаю эту строку кода в каждый поток, я получаю сообщение OperationalError, сообщающее, что файл базы данных заблокирован. Я предполагаю, что это означает, что другой поток открыл mydatabase.db через соединение sqlite3 и заблокировал его.
  • Если я помещу эту строку кода в основную программу и передам объект соединения (conn) каждому потоку, я получаю сообщение ProgrammingError, в котором говорится, что объекты SQLite, созданные в потоке, могут использоваться только в этом же потоке.

Ранее я сохранял все свои результаты в файлах CSV, и у меня не было никаких проблем с блокировкой файлов. Надеюсь, это будет возможно с sqlite. Есть идеи?

Ответы [ 14 ]

166 голосов
/ 24 мая 2010

Вопреки распространенному мнению, новые версии sqlite3 do поддерживают доступ из нескольких потоков.

Это можно включить с помощью необязательного аргумента ключевого слова check_same_thread:

sqlite.connect(":memory:", check_same_thread=False)
39 голосов
/ 26 декабря 2008

Вы можете использовать шаблон потребитель-производитель. Например, вы можете создать очередь, которая разделяется между потоками. Первый поток, который извлекает данные из Интернета, ставит эти данные в очередь общего доступа. Другой поток, которому принадлежит соединение с базой данных, извлекает данные из очереди и передает их в базу данных.

16 голосов
/ 05 апреля 2010

На найдены следующие данные: mail.python.org.pipermail.1239789

Я нашел решение. Я не знаю, почему в документации Python нет ни слова об этой опции. Поэтому мы должны добавить новый аргумент ключевого слова в функцию соединения и мы сможем создавать курсоры из него в другой ветке. Так что используйте:

sqlite.connect(":memory:", check_same_thread = False)

отлично работает для меня. Конечно, отныне мне нужно заботиться безопасного многопоточного доступа к БД. В любом случае, спасибо за попытку помочь.

13 голосов
/ 27 декабря 2008

Вы не должны использовать нити вообще для этого. Это * тривиальная задача для витой , и в любом случае это, вероятно, приведет вас значительно дальше.

Используйте только один поток, и выполнение запроса инициирует событие для выполнения записи.

Twisted позаботится о планировании, обратном вызове и т. Д ... для вас. Он выдаст вам весь результат в виде строки, или вы можете запустить его через потоковый процессор (у меня есть twitter API и Friendfeed API , которые оба запускают события для вызывающие абоненты, поскольку результаты все еще загружаются).

В зависимости от того, что вы делаете со своими данными, вы можете просто выгрузить полный результат в sqlite, когда он завершен, приготовить его и выгрузить его, или приготовить, пока он читается, и выгрузить его в конце.

У меня есть очень простое приложение, которое делает что-то похожее на то, что вы хотите на github. Я называю это pfetch (параллельная выборка). Он захватывает различные страницы по расписанию, передает результаты в файл и, при желании, запускает сценарий после успешного завершения каждой из них. Он также делает некоторые интересные вещи, такие как условные GET, но все же может быть хорошей основой для всего, что вы делаете.

12 голосов
/ 26 декабря 2008

Переключиться на многопроцессорность . Это намного лучше, хорошо масштабируется, может выходить за рамки использования нескольких ядер при использовании нескольких процессоров, и интерфейс такой же, как при использовании модуля потоков Python.

Или, как предложил Али, просто используйте механизм пула потоков SQLAlchemy . Он будет обрабатывать все для вас автоматически и имеет множество дополнительных функций, просто процитирую некоторые из них:

  1. SQLAlchemy включает в себя диалекты для SQLite, Postgres, MySQL, Oracle, MS-SQL, Firebird, MaxDB, MS Access, Sybase и Informix; IBM также выпустила драйвер DB2. Поэтому вам не нужно переписывать ваше приложение, если вы решите отойти от SQLite.
  2. Система Unit Of Work, центральная часть Object Relational Mapper (ORM) SQLAlchemy, организует ожидающие операции создания / вставки / обновления / удаления в очереди и сбрасывает их все в один пакет. Для этого он выполняет топологическую «сортировку зависимостей» всех измененных элементов в очереди, чтобы учесть ограничения внешнего ключа, и группирует избыточные операторы вместе, где их иногда можно объединить еще дальше. Это обеспечивает максимальную эффективность и безопасность транзакций, а также сводит к минимуму вероятность возникновения тупиковых ситуаций.
7 голосов
/ 26 декабря 2008

Или, если вы ленивы, как я, вы можете использовать SQLAlchemy . Он будет обрабатывать потоки для вас (, используя локальный поток, и некоторые пулы соединений ), и способ, которым он это делает, даже настраивается .

Для дополнительного бонуса, если / когда вы поймете / решите, что использование Sqlite для любого параллельного приложения будет катастрофой, вам не придется менять свой код для использования MySQL, Postgres или чего-либо еще. Вы можете просто переключиться.

2 голосов
/ 18 июля 2018

Вам необходимо использовать session.close() после каждой транзакции для базы данных, чтобы использовать тот же курсор в том же потоке, не используя тот же курсор в многопоточности, что вызывает эту ошибку.

0 голосов
/ 01 мая 2018

Я не смог найти ни одного критерия ни в одном из приведенных выше ответов, поэтому я написал тест для сравнения всего.

Я пробовал 3 подхода

  1. Чтение и запись последовательно из базы данных SQLite
  2. Использование ThreadPoolExecutor для чтения / записи
  3. Использование ProcessPoolExecutor для чтения / записи

Результаты и выводы из теста следующие:

  1. Последовательное чтение / последовательная запись работают лучше
  2. Если вы должны обрабатывать параллельно, используйте ProcessPoolExecutor для параллельного чтения
  3. Не выполняйте никаких записей ни с помощью ThreadPoolExecutor, ни с помощью ProcessPoolExecutor, так как вы столкнетесь с ошибками блокировки базы данных и вам придется повторить попытку вставки фрагмента снова

Вы можете найти код и полное решение для тестов в моем SO-ответе ЗДЕСЬ Надеюсь, это поможет!

0 голосов
/ 08 ноября 2013

Наиболее вероятная причина, по которой вы получаете ошибки с заблокированными базами данных, заключается в том, что вы должны выдать

conn.commit()

после завершения операции с базой данных. Если вы этого не сделаете, ваша база данных будет заблокирована от записи и останется такой. Другие потоки, которые ожидают записи, истечут через некоторое время (по умолчанию установлено значение 5 секунд, подробности см. http://docs.python.org/2/library/sqlite3.html#sqlite3.connect).

Пример правильной и одновременной вставки может быть следующим:

import threading, sqlite3
class InsertionThread(threading.Thread):

    def __init__(self, number):
        super(InsertionThread, self).__init__()
        self.number = number

    def run(self):
        conn = sqlite3.connect('yourdb.db', timeout=5)
        conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);')
        conn.commit()

        for i in range(1000):
            conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i))
            conn.commit()

# create as many of these as you wish
# but be careful to set the timeout value appropriately: thread switching in
# python takes some time
for i in range(2):
    t = InsertionThread(i)
    t.start()

Если вам нравится SQLite, или у вас есть другие инструменты, которые работают с базами данных SQLite, или вы хотите заменить файлы CSV на файлы базы данных SQLite, или вам нужно что-то редкое, например, межплатформенный IPC, то SQLite - отличный инструмент, который очень подходит для цель. Не позволяйте себе использовать другое решение, если оно не подходит!

0 голосов
/ 19 августа 2010

Использовать threading.Lock ()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...