Я создаю приложение, которое непрерывно извлекает данные о погоде из API для нескольких городов и считывает / записывает эти данные в базу данных SQL (MySQL), используя несколько процессов и потоков в Python. Я относительно новичок в этом.
Скажем, есть два процесса. Каждый процесс порождает N потоков, по одному потоку на город. Каждый поток выполняет один и тот же лог c для своего города.
Поток 1 в процессе 1 каждые 60 секунд получает данные из API для Нью-Йорка и записывает их в SQL таблицу X. Поток 2 в процессе 2 захватывает данные для Лос-Анджелеса каждые 60 секунд и записывает их в SQL Таблица X.
Таблица X имеет следующие атрибуты: Record Index (Int), City (Varchar), Processed (Boolean), Some Data (Int)
. Когда поток в процессе 1 записывает данные в таблицу X, атрибут Processed
по умолчанию помечается как False
.
Переход к процессу 2 ... Поток 1 в процессе 2 захватывает необработанные данные (все записи где Processed=False
) из таблицы X для Нью-Йорка каждые 60 секунд копирует их в таблицу Y и помечает эти данные как обработанные в таблице X. Поток 2 в процессе 2 получает необработанные данные из таблицы X для Лос-Анджелеса каждые 60 секунд, копирует в таблицу Y и помечает эти данные как обработанные в таблице X.
Я использую SQLAlchemy для чтения и записи в базу данных. Я инициализирую новый движок в каждом процессе с помощью:
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
Я выполняю SQL операторов с помощью:
with engine.begin() as connection:
connection.execute(sql_statement)
У меня проблема в том, что мне нужно заблокировать процесс 1 от добавления новых данных в таблицу X для Нью-Йорка (например) до тех пор, пока процесс 2 не завершит 1.) копирование необработанных данных для Нью-Йорка в таблицу Y и 2.) помечает скопированные данные как обработанные = True
в таблице X. В противном случае, возможно, что новые данные могут быть добавлены в таблицу X в процессе 1 и помечены как обработанные в процессе 2 без фактического копирования в таблицу Y.
Я считаю, что я мог бы использовать блокировку (многопроцессорная обработка) .Lock ()) для решения этой проблемы, но в дополнение к блокировке потоков, обрабатывающих Нью-Йорк во всех процессах, он также будет блокировать потоки, обрабатывающие Лос-Анджелес (и любой другой город в этом отношении) во всех процессах.
Я мог бы переключить архитектуру так, чтобы каждый процесс отвечал за обработку своего города, и каждый поток запускал разные логи c (сбор данных, обработка данных). По сути, это будет противоположностью тому, что есть сейчас. Однако большая часть обработки, которую мне нужно выполнить, связана с процессором, поэтому я считаю, что это может быть менее эффективно.
Является ли использование многопроцессорной блокировки лучшим способом решения этой проблемы? Какие есть другие альтернативы?