У меня проблемы с параллельной обработкой скрипта очистки новостей.
У меня есть следующий скрипт, который читает страницу новостей RSS Google и обрабатывает каждую из возвращенных ссылок.news_list
- это элемент BeautifulSoup, который содержит информацию о 10 самых последних новостях по некоторой теме.
def main(news_list):
news_list = soup_page.findAll("item")
feed = []
for article in news_list[:10]:
new = {}
new['title'] = article.title.text
new['source'] = article.source.text
new['link'] = article.link.text
new['date'] = datetime.strptime(article.pubDate.text, '%a, %d %b %Y %H:%M:%S %Z')
new['keywords'] = keywords(article.link.text)
feed.append(new)
Функция keywords
обрабатывает содержание новостей и возвращает ключевые слова.Эта функция занимает около 1,5 секунд на каждую новостную статью, поэтому запуск полного сценария занимает не менее 15 секунд.
Я хочу сократить продолжительность сценария, поэтому я пробовал использовать многопроцессорную обработку вместо цикла for,как это:
def process_article(article):
new = {}
new['title'] = article.title.text
new['source'] = article.source.text
new['link'] = article.link.text
new['date'] = datetime.strptime(article.pubDate.text, '%a, %d %b %Y %H:%M:%S %Z')
new['keywords'] = keywords(article.link.text)
return new
from joblib import Parallel, delayed
import multiprocessing
num_cores = multiprocessing.cpu_count()
feed = Parallel(n_jobs=num_cores)(delayed(process_news)(article) for article in news_list[:10])
Однако я получаю сообщение об ошибке, как будто функция process_article была рекурсивной:
RecursionError: maximum recursion depth exceeded while calling a Python object
Что я делаю неправильно?Это все еще происходит, если я напишу функцию следующим образом, поэтому функция keywords
не является проблемой
def process_article(article):
new = {}
return new
Любая помощь приветствуется.Спасибо!
Это полная трассировка:
RecursionError Traceback (most recent call last)
<ipython-input-90-498afb9f1a25> in <module>
1 num_cores = multiprocessing.cpu_count()
2
----> 3 results = Parallel(n_jobs=num_cores)(delayed(process_news)(article) for article in list(news_list[:10]))
/usr/local/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
787 # consumption.
788 self._iterating = False
--> 789 self.retrieve()
790 # Make sure that we get a last message telling us we are done
791 elapsed_time = time.time() - self._start_time
/usr/local/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
697 try:
698 if getattr(self._backend, 'supports_timeout', False):
--> 699 self._output.extend(job.get(timeout=self.timeout))
700 else:
701 self._output.extend(job.get())
/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
422 break
423 try:
--> 424 put(task)
425 except Exception as e:
426 job, idx = task[:2]
/usr/local/lib/python3.6/site-packages/joblib/pool.py in send(obj)
369 def send(obj):
370 buffer = BytesIO()
--> 371 CustomizablePickler(buffer, self._reducers).dump(obj)
372 self._writer.send_bytes(buffer.getvalue())
373 self._send = send
RecursionError: maximum recursion depth exceeded while calling a Python object