Чтение и запись в базу данных SQL из нескольких процессов и потоков - PullRequest
0 голосов
/ 25 апреля 2020

Я создаю приложение, которое непрерывно извлекает данные о погоде из API для нескольких городов и считывает / записывает эти данные в базу данных SQL (MySQL), используя несколько процессов и потоков в Python. Я относительно новичок в этом.

Скажем, есть два процесса. Каждый процесс порождает N потоков, по одному потоку на город. Каждый поток выполняет один и тот же лог c для своего города.

Поток 1 в процессе 1 каждые 60 секунд получает данные из API для Нью-Йорка и записывает их в SQL таблицу X. Поток 2 в процессе 2 захватывает данные для Лос-Анджелеса каждые 60 секунд и записывает их в SQL Таблица X.

Таблица X имеет следующие атрибуты: Record Index (Int), City (Varchar), Processed (Boolean), Some Data (Int). Когда поток в процессе 1 записывает данные в таблицу X, атрибут Processed по умолчанию помечается как False.

Переход к процессу 2 ... Поток 1 в процессе 2 захватывает необработанные данные (все записи где Processed=False) из таблицы X для Нью-Йорка каждые 60 секунд копирует их в таблицу Y и помечает эти данные как обработанные в таблице X. Поток 2 в процессе 2 получает необработанные данные из таблицы X для Лос-Анджелеса каждые 60 секунд, копирует в таблицу Y и помечает эти данные как обработанные в таблице X.

Я использую SQLAlchemy для чтения и записи в базу данных. Я инициализирую новый движок в каждом процессе с помощью:

engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')

Я выполняю SQL операторов с помощью:

with engine.begin() as connection:
     connection.execute(sql_statement)

У меня проблема в том, что мне нужно заблокировать процесс 1 от добавления новых данных в таблицу X для Нью-Йорка (например) до тех пор, пока процесс 2 не завершит 1.) копирование необработанных данных для Нью-Йорка в таблицу Y и 2.) помечает скопированные данные как обработанные = True в таблице X. В противном случае, возможно, что новые данные могут быть добавлены в таблицу X в процессе 1 и помечены как обработанные в процессе 2 без фактического копирования в таблицу Y.

Я считаю, что я мог бы использовать блокировку (многопроцессорная обработка) .Lock ()) для решения этой проблемы, но в дополнение к блокировке потоков, обрабатывающих Нью-Йорк во всех процессах, он также будет блокировать потоки, обрабатывающие Лос-Анджелес (и любой другой город в этом отношении) во всех процессах.

Я мог бы переключить архитектуру так, чтобы каждый процесс отвечал за обработку своего города, и каждый поток запускал разные логи c (сбор данных, обработка данных). По сути, это будет противоположностью тому, что есть сейчас. Однако большая часть обработки, которую мне нужно выполнить, связана с процессором, поэтому я считаю, что это может быть менее эффективно.

Является ли использование многопроцессорной блокировки лучшим способом решения этой проблемы? Какие есть другие альтернативы?

1 Ответ

1 голос
/ 25 апреля 2020

Несколько вещей, о которых я могу думать очень быстро, просто чтобы дать вам несколько идей

  1. Блокировка файла (в известном месте с именем, которое может быть получено из контекста, которым вы управляете). Существуют библиотеки, которые предлагают вам примитивы блокировки файлов (filelock)
  2. Реализация блокирующего примитива в БД на желаемом уровне детализации. Примитив может быть целым числом. Оба процесса должны синхронизироваться по этому. Возможно, что оба читают примитив одновременно, увеличивают его и пытаются записать обратно, но только один из них преуспеет в написании - вы должны справиться с этим

У меня был вопрос: Вы Отметим, что вы выполняете задачи, связанные с процессором. Насколько эффективна модель потоков в этом случае внутри каждого процесса?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...