Я создаю сценарий параллельного стримера в Python.У меня есть несколько тем для потоков, каждая со своими ключевыми словами.Цель состоит в том, чтобы основной сценарий порождал несколько процессов, каждый из которых транслирует одну тему.
К сожалению, только один или два процесса запущены (из общего возможного 8 в моей среде тестирования).
Я предоставил скелет своего кода (master_twitter.py) с (надеюсь) соответствующие детали:
import multiprocessing
from Streamer import Streamer
def main():
# {'topic1':['keyword1','keyword2',...], 'topic2':['keyword1',...],...}
eventDict = getEvents()
errorQueue = multiprocessing.Queue()
for topic in eventDict:
#Streamer(topic, keywords, errorQueue)
eventDict['streamer'] = Streamer(topic, eventDict[topic], errorQueue)
eventDict['streamer'].start()
Содержимое Streamer.py
import multiprocessing
from tweepy.Streaming import StreamListener
import tweepy
class Streamer(multiprocessing.Process):
def __init__(self, topic, keywords, errorQueue):
multiprocessing.Process.__init__(self)
self.topic=topic
self.keywords=keywords
self.errorQueue=errorQueue
self.auth = tweepy.OAuthHandler(consumer keys)
self.auth.set_access_token(access tokens)
self.stream = tweepy.Stream(self.auth, self.StreamerClass(self.topic))
def run(self):
self.stream.filter(track=self.keywords)
class StreamerClass(StreamListener):
def __init__(self,topic):
self.topic=topic
create_local_directories_with_topic_as_name()
def on_data(self,data):
write_to_file(data)
Я создал восемь отдельных тем с различными ключевыми словами, показанными ниже (это не реальные темы, которые я имею в виду)работаю, но проверяю темы)
"bioware": ["bioware", "anthem"],
"destiny": ["destiny", "elites"],
"halo": ["halo", "microsoft", "masterchief"],
"software": ["windows", "apple", "linux"],
"food": ["chicken", "mutton", "pork","lamb","biryani","daal"],
"bioware2": ["bioware", "anthem"]
"bioware3": ["bioware", "anthem"]
"bioware4": ["bioware", "anthem"]
У меня есть bioware2 , bioware3 и bioware4 в дополнение к просто bioware в качестве проверки согласованности;в идеале все четыре должны иметь одинаковые выходы.Я добавил несколько print () в код для отслеживания выполнения, и они указывают, что каждый Streamer правильно развернут на основе операторов печати до и после start () в master_twitter.py.
Однако из 8,6 Стример, похоже, входит.Я добавил print () в Streamer.py, и для 6 из 8 тем эти операторы print () срабатывают.Скорее, неинтуитивно, Streamers, для которых операторы print () выполняют NOT fire, - это те, которые действительно работают.
Регистрация операторов:
Running bioware4 en with PID 453 at 2019-03-20 19:50:25
Running destiny en with PID 452 at 2019-03-20 19:50:25
Running bioware en with PID 454 at 2019-03-20 19:50:25
Running chicken en with PID 455 at 2019-03-20 19:50:25
Running software en with PID 457 at 2019-03-20 19:50:25
Running halo en with PID 456 at 2019-03-20 19:50:25
Обратите внимание, что bioware2 и bioware3 не отображаются в списке.Запуск ps aux | grep master_twitter.py
дает:
asuprem 446 0.1 0.1 34004 17848 tty4 S 19:50 0:00 python workers/master_twitter.py
asuprem 450 0.0 0.0 35348 12044 tty4 S 19:50 0:00 python workers/master_twitter.py
asuprem 451 0.0 0.0 35364 12116 tty4 S 19:50 0:00 python workers/master_twitter.py
Где PID 446 - это PID исходной программы master_twitter.py, PID 450, 451, вероятно, являются PID bioware2 и bioware3 .Запуск ps aux | grep 452
(или любого другого PID из списка ведения журнала) возвращает несуществующую программу:
asuprem 452 0.0 0.0 0 0 tty4 Z 21:49 0:00 [python] <defunct>
Есть ли способ убедиться, что все процессы действительно работают?Нужно ли пробовать другие вещи, кроме многопроцессорной.Я знаю, что мог бы вручную запустить 8 отдельных процессов;однако мои требования состоят в том, чтобы разработать приложение «spawner», которое, в свою очередь, выполнит несколько подпрограмм.
Вторая причина заключается в создании отказоустойчивого диспетчера потоков (т. е. errorQueue в приведенном выше коде) для обеспечения отказоустойчивости.Если Streamer дает сбой, он выводит соответствующую информацию (тему) в errorQueue перед выходом.Таким образом, создатель может завершить процесс зомби и запустить другого вместо него.