PostgreSQL, Python 3.5, SQLAlchemy
У меня есть основная база данных, в которой есть таблица журналов, содержащая более 30 тыс. Журналов.Для каждого журнала у меня есть URL канала RSS, с помощью которого я могу анализировать и получать новые статьи.Есть некоторые обязательные поля, если они отсутствуют в какой-либо из статей, то есть сторонний API, который я могу использовать для получения дополнительной информации о статье.Мне нужно написать скрипт на python, который будет работать непрерывно и циклически просматривать журналы, извлекаемые из основной БД несколько раз каждый день.
Атрибуты полученных статей необходимо проверить перед сохранением в БД FeedResponse.Все действительные статьи будут сохранены в таблице статей, а статьи с ошибками будут сохранены в таблице ошибок вместе с сообщением об ошибке. И в конечном итоге только основные статьи будут отправлены в основную БД.
Теперь мне нужно немногопомочь с разработкой решения для этого.На данный момент у меня есть несколько рабочих потоков, которые будут извлекать данные из основной БД и анализировать ленту rss для получения списка статей, затем проверять статьи и вызывать сторонний API, где это необходимо. После очистки и проверки этосохранит все статьи, извлеченные из этого журнала, в базе данных FeedResponse. ![Flow Chart](https://i.stack.imgur.com/P0pkd.png)
Мои проблемы:
- Не уверен, что многопоточность - лучший способ достиженияэто или многопроцессорность или еще что-нибудь.Мне нужна быстрая обработка, но я не хочу оказаться в тупиковой ситуации.
- Способ, которым я реализовал многопоточность, заключается в том, что он эффективен, или мне нужно реорганизовать его, чтобы разбить его.
- Должен ли яполучить все журналы за один вызов, а затем перебрать те, которые находятся в памяти, или каждый рабочий поток должен извлечь один из основной базы данных
- Как убедиться, что сценарий продолжает работать и что нет жульнических / заблокированных потоков
Для краткости я упомянул только 2 парсера, но есть и несколько других парсеров.
Моя реализация
У меня есть таблица процессовв FeedResponse DB, которая используется для определения того, какой журнал должен быть выбран рабочим потоком.Схема БД выглядит следующим образом:
class Process(Base):
__tablename__ = 'process'
id = Column(Integer, primary_key=True, autoincrement=True)
worker_id = Column(Integer)
journal_id = Column(Integer)
time_started = Column(DateTime(timezone=True), nullable=True)
time_finished = Column(DateTime(timezone=True), nullable=True)
is_finished = Column(Boolean)
manage_worker.py
def start_workers():
count = int(sys.argv[1])
if count > 1:
for i in range(0, count):
worker = Worker(i)
worker_pool.append(worker)
else:
w = Worker(0, True)
worker_pool.append(w)
for w in worker_pool:
w.start()
worker.py
class Worker(Thread):
def __init__(self, worker_id, run_single=False):
self.worker_id = worker_id
self.current_job_journal_id = None
self.current_job_row_id = None
self.current_highest_id = 0
self.is_working = False
self.run_single = run_single
threading.Thread.__init__(self)
def run(self):
self.connect_db() #Start a session with both the DBs: main DB and FeedResponseDB
self.start_work()
def start_work(self):
while True:
if not self.is_working:
self.is_working = True
# the max journal id from main DB is assigned to max journal id
max_journal_id = self.get_max_journal_id()
# tuple of new inserted row id from process table in FeedResponseDB
new_process_row_id, new_journal_id = self.create_new_job(max_journal_id[0])
# the current job id is assigned as the new journal id
self.current_job_journal_id = new_journal_id
self.current_job_row_id = new_process_row_id
# journal data is the row from main DB returned when queried by the new journal id (which is the max journal id from main DB)
journal_data = self.get_journal_data(new_journal_id)
self.parse(journal_data, self.handle_is_finished)
def parse(self, journal_data, handle_is_finished):
time.sleep(2)
articles = RssParser(journal_data.rss_url, journal_data.id)
for a in articles:
If data_incomplete: #If article data is incomplete
Article = ApiParser(a)
# update the entry in the list of articles
validate_articles(articles)
handle_is_finished()
def create_new_job(self, max_journal_id, last_call_id=None):
if last_call_id is None:
# get the latest parse date from FeedResponse DB
# get the latest id from the latest parse date from FeedResponse DB
max_process = db.session.query(Process).filter(Process.time_started == max_date).first()
max_process_journal_id = max_process.journal_id
else:
max_process_journal_id = 0
else:
# if last_call_id has a value, make it the latest process journal id from the table
max_process_journal_id = last_call_id
# add 1 to the latest process journal id
max_process_journal_id += 1
# check if our max journal id is higher than the highest in main db, and if so, reset to 1
if max_process_journal_id > max_journal_id:
max_process_journal_id = 1
#insert process in FeedResponse DB
new_max_process_id = db.session.execute(
'''
INSERT INTO process (worker_id, journal_id, time_started, time_finished, is_finished)
SELECT
{} as worker_id,
CASE
WHEN {} <= {} THEN {}
ELSE 1
END as journal_id,
clock_timestamp() as time_started,
null as time_finished,
'FALSE' as is_finished
RETURNING id
'''
.format(self.worker_id,
max_process_journal_id,
max_journal_id,
max_process_journal_id).first().id
return (new_max_process_id, max_process_journal_id)
def handle_is_finished(self):
# print("FINISHING PARSE PROCESS FOR ROW ID: ------->", self.current_job_row_id)
current_process = db..session.query(Process).filter(Process.id == self.current_job_row_id).first()
current_process.time_finished = datetime.datetime.now()
current_process.is_finished = True
self.current_job_journal_id = None
self.current_job_row_id = None
self.disconnect_db()
self.is_working = False