Запуск нескольких потоков Twitter (Tweepy) с использованием многопроцессорной обработки - PullRequest
0 голосов
/ 21 марта 2019

Я создаю сценарий параллельного стримера в Python.У меня есть несколько тем для потоков, каждая со своими ключевыми словами.Цель состоит в том, чтобы основной сценарий порождал несколько процессов, каждый из которых транслирует одну тему.

К сожалению, только один или два процесса запущены (из общего возможного 8 в моей среде тестирования).

Я предоставил скелет своего кода (master_twitter.py) с (надеюсь) соответствующие детали:

import multiprocessing
from Streamer import Streamer

def main():
    # {'topic1':['keyword1','keyword2',...], 'topic2':['keyword1',...],...}
    eventDict = getEvents()
    errorQueue = multiprocessing.Queue()

    for topic in eventDict:
        #Streamer(topic, keywords, errorQueue)
        eventDict['streamer'] = Streamer(topic, eventDict[topic], errorQueue)
        eventDict['streamer'].start()

Содержимое Streamer.py

import multiprocessing
from tweepy.Streaming import StreamListener
import tweepy

class Streamer(multiprocessing.Process):
    def __init__(self, topic, keywords, errorQueue):
        multiprocessing.Process.__init__(self)
        self.topic=topic
        self.keywords=keywords
        self.errorQueue=errorQueue

        self.auth = tweepy.OAuthHandler(consumer keys)
        self.auth.set_access_token(access tokens)
        self.stream = tweepy.Stream(self.auth, self.StreamerClass(self.topic))

    def run(self):
        self.stream.filter(track=self.keywords)

    class StreamerClass(StreamListener):
        def __init__(self,topic):
            self.topic=topic
            create_local_directories_with_topic_as_name()

        def on_data(self,data):
            write_to_file(data)

Я создал восемь отдельных тем с различными ключевыми словами, показанными ниже (это не реальные темы, которые я имею в виду)работаю, но проверяю темы)

    "bioware":  ["bioware", "anthem"],
    "destiny":  ["destiny", "elites"],
    "halo":     ["halo", "microsoft", "masterchief"],
    "software": ["windows", "apple", "linux"],
    "food":     ["chicken", "mutton", "pork","lamb","biryani","daal"],
    "bioware2": ["bioware", "anthem"]
    "bioware3": ["bioware", "anthem"]
    "bioware4": ["bioware", "anthem"]

У меня есть bioware2 , bioware3 и bioware4 в дополнение к просто bioware в качестве проверки согласованности;в идеале все четыре должны иметь одинаковые выходы.Я добавил несколько print () в код для отслеживания выполнения, и они указывают, что каждый Streamer правильно развернут на основе операторов печати до и после start () в master_twitter.py.

Однако из 8,6 Стример, похоже, входит.Я добавил print () в Streamer.py, и для 6 из 8 тем эти операторы print () срабатывают.Скорее, неинтуитивно, Streamers, для которых операторы print () выполняют NOT fire, - это те, которые действительно работают.

Регистрация операторов:

Running bioware4 en with PID 453 at 2019-03-20 19:50:25
Running destiny en with PID 452 at 2019-03-20 19:50:25
Running bioware en with PID 454 at 2019-03-20 19:50:25
Running chicken en with PID 455 at 2019-03-20 19:50:25
Running software en with PID 457 at 2019-03-20 19:50:25
Running halo en with PID 456 at 2019-03-20 19:50:25

Обратите внимание, что bioware2 и bioware3 не отображаются в списке.Запуск ps aux | grep master_twitter.py дает:

asuprem    446  0.1  0.1  34004 17848 tty4     S    19:50   0:00 python workers/master_twitter.py
asuprem    450  0.0  0.0  35348 12044 tty4     S    19:50   0:00 python workers/master_twitter.py
asuprem    451  0.0  0.0  35364 12116 tty4     S    19:50   0:00 python workers/master_twitter.py

Где PID 446 - это PID исходной программы master_twitter.py, PID 450, 451, вероятно, являются PID bioware2 и bioware3 .Запуск ps aux | grep 452 (или любого другого PID из списка ведения журнала) возвращает несуществующую программу:

asuprem    452  0.0  0.0      0     0 tty4     Z    21:49   0:00 [python] <defunct>

Есть ли способ убедиться, что все процессы действительно работают?Нужно ли пробовать другие вещи, кроме многопроцессорной.Я знаю, что мог бы вручную запустить 8 отдельных процессов;однако мои требования состоят в том, чтобы разработать приложение «spawner», которое, в свою очередь, выполнит несколько подпрограмм.

Вторая причина заключается в создании отказоустойчивого диспетчера потоков (т. е. errorQueue в приведенном выше коде) для обеспечения отказоустойчивости.Если Streamer дает сбой, он выводит соответствующую информацию (тему) в errorQueue перед выходом.Таким образом, создатель может завершить процесс зомби и запустить другого вместо него.

...