Как я могу динамически изменить тип твитов, которые транслируются, и выяснить, какое сообщение кому отправить? - PullRequest
0 голосов
/ 15 марта 2019

Описание приложения

Поэтому я пытаюсь создать приложение, которое выполняет анализ настроений в режиме реального времени на твитах (как можно ближе к реальному времени, насколько я могу его получить), и эти твиты должны бытьна основе пользовательского ввода.Итак, на главной странице моего приложения у меня есть простая панель поиска, где пользователь может ввести тему, по которой он хотел бы выполнить анализ настроений, и когда он нажмет ввод, он перенесет их на другую страницу, где он увидит линейную диаграмму, отображающуювсе данные в режиме реального времени.

Проблема 1

Первая проблема, с которой я сталкиваюсь на данный момент, заключается в том, что я не знаю, как я могу настроить твип, чтобы изменить то, что он отслеживает, когда два или более человека делают запрос,Если бы у меня была глобальная потоковая передача, которую я просто отключал и переподключал каждый раз, когда пользователь делает новый запрос, то он также отключался бы и от других пользователей, которые я не хочу.С другой стороны, если бы я выделил объект потоковой передачи для каждого подключенного пользователя, эта стратегия должна работать.Это все еще создает проблему.Твиттер не позволяет вам удерживать более одного соединения за раз, как кажется, учитывая это сообщение StackOverflow.

Поддерживает ли Tweepy запуск нескольких потоков для сбора данных?

ЕслиЯ все еще должен был согласиться с этим, я рискую забанить свой IP.Поэтому оба эти решения бесполезны.

Задача 2

Последняя проблема, с которой я столкнулся, - выяснить, кому принадлежит сообщение.В настоящее время я использую RabbitMQ для хранения всех входящих сообщений в одной очереди под названием twitter_topic_feed.Для каждого твита, который я получаю от твипа, я публикую его в этой очереди.Затем RabbiMQ получает сообщение и отправляет его каждому доступному соединению.Очевидно, это поведение не то, что я ищу.Рассмотрим двух пользователей, которые ищут пиццу и спорт.Оба пользователя получат твиты, относящиеся к футболу и пицце, когда один пользователь попросит о спортивных твитах, а другой попросит о твитах с пиццей.

Одна из идей - создать очередь с уникальным идентификатором для каждого доступного соединения.Идентификатор будет иметь вид {Search Term} _ {Hash ID} .Для генерации хеш-идентификатора я могу использовать пакет UUID, который доступен в python, и создать идентификатор при открытии соединения и удалить его при закрытии.Конечно, когда они закрывают соединение, мне также нужно удалить очередь.Я не уверен, насколько хорошо это решение будет масштабироваться.Если бы у нас было 10 000 подключений, у нас было бы 10 000 очередей, и в каждой очереди потенциально могло бы храниться много сообщений.Похоже, это будет очень интенсивно использовать память.

Дизайн

  • tornado Framework для WebSockets,
  • tweepy API для потоковых твитов
  • RabbitMQ Для публикации сообщений в очереди всякий раз, когда tweepy получает новый твит.RabbitMQ затем использует это сообщение и отправляет его в WebSocket.

Попытка (что у меня есть на данный момент)

TweetStreamListener использует API-интерфейс tweepy для прослушивания твитов на основеввод пользователя.Какой бы твит он ни получал, он вычисляет полярность этого твита и публикует его в очереди rabbitMQ twitter_topic_feed.

import logging
from tweepy import StreamListener, OAuthHandler, Stream, API
from sentiment_analyzer import calculate_polarity_score
from constants import SETTINGS

auth = OAuthHandler(
    SETTINGS["TWITTER_CONSUMER_API_KEY"], SETTINGS["TWITTER_CONSUMER_API_SECRET_KEY"])

auth.set_access_token(
    SETTINGS["TWITTER_ACCESS_KEY"], SETTINGS["TWITTER_ACCESS_SECRET_KEY"])

api = API(auth, wait_on_rate_limit=True)


class TweetStreamListener(StreamListener):

    def __init__(self):
        self.api = api
        self.stream = Stream(auth=self.api.auth, listener=self)

    def start_listening(self):
        pass

    def on_status(self, status):
        if not hasattr(status, 'retweeted_status'):
            polarity = calculate_polarity_score(status.text)

            message = {
                'polarity': polarity,
                'timestamp': status.created_at
            }

            # TODO(Luis) Need to figure who to send this message to.
            logging.debug("Message received from Twitter: {0}".format(message))

    # limit handling
    def on_limit(self, status):
        logging.info(
            'Limit threshold exceeded. Status code: {0}'.format(status))

    def on_timeout(self, status):
        logging.error('Stream disconnected. continuing...')
        return True  # Don't kill the stream

    """
    Summary: Callback that executes for any error that may occur. Whenever we get a 420 Error code, we simply
    stop streaming tweets as we have reached our rate limit. This is due to making too many requests.

    Returns: False if we are sending too many tweets, otherwise return true to keep the stream going.
    """

    def on_error(self, status_code):
        if status_code == 420:
            logging.error(
                'Encountered error code 420. Disconnecting the stream')
            # returning False in on_data disconnects the stream
            return False
        else:
            logging.error('Encountered error with status code: {}'.format(
                status_code))
            return True  # Don't kill the stream

WS_Handler отвечает за ведение списка открытых соединений и отправку любого сообщения, которое онполучает обратно каждому клиенту (такое поведение мне не нужно).

import logging
import json
from uuid import uuid4
from tornado.web import RequestHandler
from tornado.websocket import WebSocketHandler


class WSHandler(WebSocketHandler):

    def check_origin(self, origin):
        return True

    @property
    def sess_id(self):
        return self._sess_id

    def open(self):
        self._sess_id = uuid4().hex
        logging.debug('Connection established.')
        self.application.pc.register_websocket(self._sess_id, self)

    # When messages arrives via RabbitMQ, write it to websocket
    def on_message(self, message):
        logging.debug('Message received: {0}'.format(message))
        self.application.pc.redirect_incoming_message(
            self._sess_id, json.dumps(message))

    def on_close(self):
        logging.debug('Connection closed.')
        self.application.pc.unregister_websocket(self._sess_id)

Модуль PikaClient содержит PikaClient, который позволяет отслеживать входящие и исходящие каналы, а также отслеживатьвеб-сокетов, которые в данный момент работают.

import logging
import pika
from constants import SETTINGS
from pika import PlainCredentials, ConnectionParameters
from pika.adapters.tornado_connection import TornadoConnection

pika.log = logging.getLogger(__name__)


class PikaClient(object):
    INPUT_QUEUE_NAME = 'in_queue'

    def __init__(self):
        self.connected = False
        self.connecting = False
        self.connection = None
        self.in_channel = None
        self.out_channels = {}
        self.websockets = {}

    def connect(self):
        if self.connecting:
            return

        self.connecting = True

        # Setup rabbitMQ connection
        credentials = PlainCredentials(
            SETTINGS['RABBITMQ_USERNAME'], SETTINGS['RABBITMQ_PASSWORD'])

        param = ConnectionParameters(
            host=SETTINGS['RABBITMQ_HOST'], port=SETTINGS['RABBITMQ_PORT'], virtual_host='/', credentials=credentials)

        return TornadoConnection(param, on_open_callback=self.on_connected)

    def run(self):
        self.connection = self.connect()
        self.connection.ioloop.start()

    def stop(self):
        self.connected = False
        self.connecting = False
        self.connection.ioloop.stop()

    def on_connected(self, unused_Connection):
        self.connected = True
        self.in_channel = self.connection.channel(self.on_conn_open)

    def on_conn_open(self, channel):
        self.in_channel.exchange_declare(
            exchange='tornado_input', exchange_type='topic')
        channel.queue_declare(
            callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME)

    def on_input_queue_declare(self, queue):
        self.in_channel.queue_bind(
            callback=None, exchange='tornado_input', queue=self.INPUT_QUEUE_NAME, routing_key="#")

    def register_websocket(self, sess_id, ws):
        self.websockets[sess_id] = ws
        self.create_out_channel(sess_id)

    def unregister_websocket(self, sess_id):
        self.websockets.pop(sess_id)

        if sess_id in self.out_channels:
            self.out_channels[sess_id].close()

    def create_out_channel(self, sess_id):
        def on_output_channel_creation(channel):
            def on_output_queue_declaration(queue):
                channel.basic_consume(self.on_message, queue=sess_id)

            self.out_channels[sess_id] = channel
            channel.queue_declare(callback=on_output_queue_declaration,
                                  queue=sess_id, auto_delete=True, exclusive=True)

        self.connection.channel(on_output_channel_creation)

    def redirect_incoming_message(self, sess_id, message):
        self.in_channel.basic_publish(
            exchange='tornado_input', routing_key=sess_id, body=message)

    def on_message(self, channel, method, header, body):
        sess_id = method.routing_key

        if sess_id in self.websockets:
            self.websockets[sess_id].write_message(body)
            channel.basic_ack(delivery_tag=method.delivery_tag)
        else:
            channel.basic_reject(delivery_tag=method.delivery_tag)

Server.py является основной точкой входа приложения.

import logging
import os
from tornado import web, ioloop
from tornado.options import define, options, parse_command_line
from client import PikaClient
from handlers import WSHandler, MainHandler

define("port", default=3000, help="run on the given port.", type=int)
define("debug", default=True, help="run in debug mode.", type=bool)


def main():
    parse_command_line()

    settings = {
        "debug": options.debug,
        "static_path": os.path.join(os.path.dirname(__file__), "web/static")
    }

    app = web.Application(
        [
            (r"/", MainHandler),
            (r"/stream", WSHandler),
        ],
        **settings
    )

    # Setup PikaClient
    app.pc = PikaClient()

    app.listen(options.port)
    logging.info("Server running on http://localhost:3000")

    try:
        app.pc.run()
    except KeyboardInterrupt:
        app.pc.stop()


if __name__ == "__main__":
    main()
...