Архитектура для инструмента разбора - PullRequest
0 голосов
/ 10 октября 2018

PostgreSQL, Python 3.5, SQLAlchemy

У меня есть основная база данных, в которой есть таблица журналов, содержащая более 30 тыс. Журналов.Для каждого журнала у меня есть URL канала RSS, с помощью которого я могу анализировать и получать новые статьи.Есть некоторые обязательные поля, если они отсутствуют в какой-либо из статей, то есть сторонний API, который я могу использовать для получения дополнительной информации о статье.Мне нужно написать скрипт на python, который будет работать непрерывно и циклически просматривать журналы, извлекаемые из основной БД несколько раз каждый день.

Атрибуты полученных статей необходимо проверить перед сохранением в БД FeedResponse.Все действительные статьи будут сохранены в таблице статей, а статьи с ошибками будут сохранены в таблице ошибок вместе с сообщением об ошибке. И в конечном итоге только основные статьи будут отправлены в основную БД.

Теперь мне нужно немногопомочь с разработкой решения для этого.На данный момент у меня есть несколько рабочих потоков, которые будут извлекать данные из основной БД и анализировать ленту rss для получения списка статей, затем проверять статьи и вызывать сторонний API, где это необходимо. После очистки и проверки этосохранит все статьи, извлеченные из этого журнала, в базе данных FeedResponse. Flow Chart

Мои проблемы:

  • Не уверен, что многопоточность - лучший способ достиженияэто или многопроцессорность или еще что-нибудь.Мне нужна быстрая обработка, но я не хочу оказаться в тупиковой ситуации.
  • Способ, которым я реализовал многопоточность, заключается в том, что он эффективен, или мне нужно реорганизовать его, чтобы разбить его.
  • Должен ли яполучить все журналы за один вызов, а затем перебрать те, которые находятся в памяти, или каждый рабочий поток должен извлечь один из основной базы данных
  • Как убедиться, что сценарий продолжает работать и что нет жульнических / заблокированных потоков

Для краткости я упомянул только 2 парсера, но есть и несколько других парсеров.

Моя реализация

У меня есть таблица процессовв FeedResponse DB, которая используется для определения того, какой журнал должен быть выбран рабочим потоком.Схема БД выглядит следующим образом:

class Process(Base):
   __tablename__ = 'process'
   id = Column(Integer, primary_key=True, autoincrement=True)
   worker_id = Column(Integer)
   journal_id  = Column(Integer)
   time_started = Column(DateTime(timezone=True), nullable=True)
   time_finished = Column(DateTime(timezone=True), nullable=True)
   is_finished = Column(Boolean)

manage_worker.py

def start_workers():
   count = int(sys.argv[1])
   if count > 1:
       for i in range(0, count):
           worker = Worker(i)
           worker_pool.append(worker)
   else:
       w = Worker(0, True)
       worker_pool.append(w)
   for w in worker_pool:
       w.start()

worker.py

class Worker(Thread):
   def __init__(self, worker_id, run_single=False):
       self.worker_id = worker_id
       self.current_job_journal_id = None
       self.current_job_row_id = None
       self.current_highest_id = 0
       self.is_working = False
       self.run_single = run_single
       threading.Thread.__init__(self)

   def run(self):
       self.connect_db() #Start a session with both the DBs: main DB and FeedResponseDB
       self.start_work()

   def start_work(self):
       while True:
           if not self.is_working:
               self.is_working = True
               # the max journal id from main DB is assigned to max journal id
               max_journal_id = self.get_max_journal_id()

               # tuple of new inserted row id from process table in FeedResponseDB
               new_process_row_id, new_journal_id = self.create_new_job(max_journal_id[0])
               # the current job id is assigned as the new journal id
               self.current_job_journal_id = new_journal_id
               self.current_job_row_id = new_process_row_id

               # journal data is the row from main DB returned when queried by the new journal id (which is the max journal id from main DB)
               journal_data = self.get_journal_data(new_journal_id)
               self.parse(journal_data, self.handle_is_finished)

   def parse(self, journal_data, handle_is_finished):
       time.sleep(2)       
       articles = RssParser(journal_data.rss_url, journal_data.id)
       for a in articles:
            If data_incomplete: #If article data is incomplete
                  Article = ApiParser(a)
                  # update the entry in the list of articles
       validate_articles(articles)
       handle_is_finished() 

   def create_new_job(self, max_journal_id, last_call_id=None):
       if last_call_id is None:
           # get the latest parse date from FeedResponse DB
           # get the latest id from the latest parse date from FeedResponse DB
               max_process = db.session.query(Process).filter(Process.time_started == max_date).first()
               max_process_journal_id = max_process.journal_id
           else:
               max_process_journal_id = 0
       else:
           # if last_call_id has a value, make it the latest process journal id from the  table
           max_process_journal_id = last_call_id

       # add 1 to the latest process journal id
       max_process_journal_id += 1

       # check if our max journal id is higher than the highest in main db, and if so, reset to 1
       if max_process_journal_id > max_journal_id:
           max_process_journal_id = 1

       #insert process in FeedResponse DB
       new_max_process_id = db.session.execute(
           '''
           INSERT INTO process (worker_id, journal_id, time_started, time_finished, is_finished)
           SELECT
           {} as worker_id,
           CASE
               WHEN {} <= {} THEN {}
               ELSE 1
           END as journal_id,
           clock_timestamp() as time_started,
           null as time_finished,
           'FALSE' as is_finished
           RETURNING id
           '''
           .format(self.worker_id,
                   max_process_journal_id,
                   max_journal_id,
                   max_process_journal_id).first().id
           return (new_max_process_id, max_process_journal_id)

   def handle_is_finished(self):

       # print("FINISHING PARSE PROCESS FOR ROW ID: ------->", self.current_job_row_id)
       current_process = db..session.query(Process).filter(Process.id == self.current_job_row_id).first()
       current_process.time_finished = datetime.datetime.now()
       current_process.is_finished = True
       self.current_job_journal_id = None
       self.current_job_row_id = None
       self.disconnect_db()
       self.is_working = False
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...