Python, SQLite и многопоточность - PullRequest
7 голосов
/ 08 февраля 2009

Я работаю над приложением, которое будет собирать данные через HTTP из нескольких мест, кэшировать данные локально и затем обрабатывать их через HTTP.

Итак, я смотрел на следующее. Мое приложение сначала создаст несколько потоков, которые будут собирать данные с заданным интервалом и кэшировать эти данные локально в базу данных SQLite.

Затем в главном потоке запустите приложение CherryPy, которое будет запрашивать эту базу данных SQLite и обрабатывать данные.

Моя проблема: как мне обрабатывать соединения с базой данных SQLite из моих потоков и из приложения CherryPy?

Если я сделаю соединение для потока с базой данных, смогу ли я также создать / использовать базу данных в памяти?

Ответы [ 6 ]

7 голосов
/ 08 февраля 2009

Краткий ответ: не используйте Sqlite3 в потоковом приложении.

Базы данных Sqlite3 хорошо масштабируются по размеру, но довольно ужасно для параллелизма. Вы будете страдать от ошибок «База данных заблокирована».

Если вы это сделаете, вам понадобится соединение для каждого потока, и вы должны убедиться, что эти соединения очищаются после себя. Это традиционно обрабатывается с помощью локальных сеансов потоков и выполняется довольно хорошо (например) с использованием ScopedSession SQLAlchemy. Я бы использовал это на вашем месте, даже если вы не используете функции SQLAlchemy ORM.

3 голосов
/ 03 мая 2009

Вы можете использовать что-то вроде , что .

1 голос
/ 08 февраля 2009

"... создать несколько потоков, которые будут собирать данные с заданным интервалом и кэшировать эти данные локально в базу данных sqlite. Затем в главном потоке запустите приложение CherryPy, которое будет запрашивать эту базу данных sqlite и обрабатывать данные. "

Не тратьте много времени на темы. То, что вы описываете - это просто процессы ОС. Просто запустите обычные процессы для сбора и запустите Cherry Py.

Для этого у вас нет реального использования для параллельных потоков в одном процессе. Сбор данных с заданным интервалом - когда это делается с помощью простых процессов ОС - может быть запланирован ОС очень просто. Крон, например, отлично справляется с этим.

Приложение CherryPy также представляет собой процесс ОС, а не отдельный поток какого-то более крупного процесса.

Просто используйте процессы - потоки вам не помогут.

0 голосов
/ 01 мая 2018

Этот тест проводится для определения наилучшего способа записи и чтения из базы данных SQLite. Мы следуем 3 подходам ниже

  1. Чтение и запись без каких-либо потоков (методы с нормальным словом)
  2. Читать и писать с темами
  3. Чтение и запись с процессами

Наш примерный набор данных является фиктивным сгенерированным набором данных OHLC с символом, временной меткой и 6 поддельными значениями для ohlc и volumefrom, volumeto

Считывает

  1. Обычный метод занимает около 0,25 секунды, чтобы прочитать
  2. Потоковый метод занимает 10 секунд
  3. Обработка занимает 0,25 секунды, чтобы прочитать

Победитель: Обработка и Обычная

Пишет

  1. Обычный метод записи занимает около 1,5 секунд
  2. Потоковый метод занимает около 30 секунд
  3. Обработка занимает около 30 секунд

Победитель: Обычный

Примечание: Все записи не записываются с использованием потоковых и обработанных методов записи. Потоковые и обработанные методы записи, очевидно, сталкиваются с ошибками заблокированных баз данных, поскольку записи ставятся в очередь SQlite только ставит в очередь записи на определенный порог и затем выдает sqlite3.OperationalError, указывающий, что база данных заблокирована. Идеальный способ - повторить попытку вставки того же фрагмента, но это не имеет смысла, поскольку выполнение метода для параллельной вставки занимает больше времени, чем последовательное чтение, даже без повторной попытки. заблокированные / неудачные вставки Без повторных попыток 97% строк были записаны и все равно заняли в 10 раз больше времени, чем последовательная запись

Стратегии на вынос:

  1. Предпочитают читать SQLite и записывать его в одном потоке

  2. Если вы должны выполнять многопоточность, используйте многопроцессорную обработку для чтения, которая имеет более или менее одинаковую производительность, и переходите к однопоточным операциям записи

  3. НЕ ИСПОЛЬЗУЙТЕ РЕЗЬБЫ для операций чтения и записи, так как они в 10 раз медленнее, вы можете поблагодарить GIL за это

Вот код для полного теста

import sqlite3
import time
import random
import string
import os
import timeit
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import os

database_file = os.path.realpath('../files/ohlc.db')

create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))'
insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)'
select = 'SELECT * from database_threading_test'

def time_stuff(some_function):
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        value = some_function(*args, **kwargs)
        print(timeit.default_timer() - t0, 'seconds')
        return value
    return wrapper

def generate_values(count=100):
    end = int(time.time()) - int(time.time()) % 900
    symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
    ts = list(range(end - count * 900, end, 900))
    for i in range(count):
        yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5)

def generate_values_list(symbols=1000,count=100):
    values = []
    for _ in range(symbols):
        values.extend(generate_values(count))
    return values

@time_stuff
def sqlite_normal_read():
    """

    100k records in the database, 1000 symbols, 100 rows
    First run
    0.25139795300037804 seconds
    Second run

    Third run
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            results = conn.execute(select).fetchall()
            print(len(results))
    except sqlite3.OperationalError as e:
        print(e)

@time_stuff
def sqlite_normal_write():
    """
    1000 symbols, 100 rows
    First run
    2.279409104000024 seconds
    Second run
    2.3364172020001206 seconds
    Third run
    """
    l = generate_values_list()
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            conn.executemany(insert_statement, l)

    except sqlite3.OperationalError as e:
        print(e)

@time_stuff
def sequential_batch_read():
    """
    We read all the rows for each symbol one after the other in sequence
    First run
    3.661222331999852 seconds
    Second run
    2.2836898810001003 seconds
    Third run
    0.24514851899994028 seconds
    Fourth run
    0.24082150699996419 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall()
            for symbol in symbols:
                results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)  



def sqlite_threaded_read_task(symbol):
    results = []
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)
    finally:
        return results

def sqlite_multiprocessed_read_task(symbol):
    results = []
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)
    finally:
        return results

@time_stuff
def sqlite_threaded_read():
    """
    1000 symbols, 100 rows per symbol
    First run
    9.429676861000189 seconds
    Second run
    10.18928106400017 seconds
    Third run
    10.382290903000467 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
    with ThreadPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_threaded_read_task, symbols, chunksize=50)
        for result in results:
            pass

@time_stuff
def sqlite_multiprocessed_read():
    """
    1000 symbols, 100 rows
    First run
    0.2484774920012569 seconds!!!
    Second run
    0.24322178500005975 seconds
    Third run
    0.2863524549993599 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
    with ProcessPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50)
        for result in results:
            pass

def sqlite_threaded_write_task(n):
    """
    We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    data = list(generate_values())
    try:
        with conn:
            conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
    except sqlite3.OperationalError as e:
        print("Database locked",e)
    finally:
        conn.close()
        return len(data)

def sqlite_multiprocessed_write_task(n):
    """
    We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    data = list(generate_values())
    try:
        with conn:
            conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
    except sqlite3.OperationalError as e:
        print("Database locked",e)
    finally:
        conn.close()
        return len(data)

@time_stuff
def sqlite_threaded_write():
    """

    Did not write all the results but the outcome with 97400 rows written is still this...
    Takes 20x the amount of time as a normal write
    1000 symbols, 100 rows
    First run
    28.17819765000013 seconds
    Second run
    25.557972323000058 seconds
    Third run
    """
    symbols = [i for i in range(1000)]
    with ThreadPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_threaded_write_task, symbols, chunksize=50)
        for result in results:
            pass

@time_stuff
def sqlite_multiprocessed_write():
    """
    1000 symbols, 100 rows
    First run
    30.09209805699993 seconds
    Second run
    27.502465319000066 seconds
    Third run
    """
    symbols = [i for i in range(1000)]
    with ProcessPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50)
        for result in results:
            pass


sqlite_normal_write()
0 голосов
/ 08 февраля 2009

В зависимости от скорости передачи данных sqlite может быть абсолютно правильным способом сделать это. Вся база данных блокируется для каждой записи, поэтому вы не собираетесь масштабировать до 1000 одновременных записей в секунду. Но если у вас их всего несколько, это самый безопасный способ убедиться, что вы не перезаписываете друг друга.

0 голосов
/ 08 февраля 2009

В зависимости от приложения, БД может быть очень трудоемкой. Если мы говорим об изменчивых данных, возможно, вы могли бы полностью пропустить связь через БД и обмениваться данными между процессом сбора данных и процессом (ами) обработки данных через IPC. Это не вариант, если данные должны быть сохранены, конечно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...