Описание приложения
Поэтому я пытаюсь создать приложение, которое выполняет анализ настроений в режиме реального времени на твитах (как можно ближе к реальному времени, насколько я могу его получить), и эти твиты должны бытьна основе пользовательского ввода.Итак, на главной странице моего приложения у меня есть простая панель поиска, где пользователь может ввести тему, по которой он хотел бы выполнить анализ настроений, и когда он нажмет ввод, он перенесет их на другую страницу, где он увидит линейную диаграмму, отображающуювсе данные в режиме реального времени.
Проблема 1
Первая проблема, с которой я сталкиваюсь на данный момент, заключается в том, что я не знаю, как я могу настроить твип, чтобы изменить то, что он отслеживает, когда два или более человека делают запрос,Если бы у меня была глобальная потоковая передача, которую я просто отключал и переподключал каждый раз, когда пользователь делает новый запрос, то он также отключался бы и от других пользователей, которые я не хочу.С другой стороны, если бы я выделил объект потоковой передачи для каждого подключенного пользователя, эта стратегия должна работать.Это все еще создает проблему.Твиттер не позволяет вам удерживать более одного соединения за раз, как кажется, учитывая это сообщение StackOverflow.
Поддерживает ли Tweepy запуск нескольких потоков для сбора данных?
ЕслиЯ все еще должен был согласиться с этим, я рискую забанить свой IP.Поэтому оба эти решения бесполезны.
Задача 2
Последняя проблема, с которой я столкнулся, - выяснить, кому принадлежит сообщение.В настоящее время я использую RabbitMQ для хранения всех входящих сообщений в одной очереди под названием twitter_topic_feed
.Для каждого твита, который я получаю от твипа, я публикую его в этой очереди.Затем RabbiMQ получает сообщение и отправляет его каждому доступному соединению.Очевидно, это поведение не то, что я ищу.Рассмотрим двух пользователей, которые ищут пиццу и спорт.Оба пользователя получат твиты, относящиеся к футболу и пицце, когда один пользователь попросит о спортивных твитах, а другой попросит о твитах с пиццей.
Одна из идей - создать очередь с уникальным идентификатором для каждого доступного соединения.Идентификатор будет иметь вид {Search Term} _ {Hash ID} .Для генерации хеш-идентификатора я могу использовать пакет UUID, который доступен в python, и создать идентификатор при открытии соединения и удалить его при закрытии.Конечно, когда они закрывают соединение, мне также нужно удалить очередь.Я не уверен, насколько хорошо это решение будет масштабироваться.Если бы у нас было 10 000 подключений, у нас было бы 10 000 очередей, и в каждой очереди потенциально могло бы храниться много сообщений.Похоже, это будет очень интенсивно использовать память.
Дизайн
tornado Framework
для WebSockets, tweepy API
для потоковых твитов RabbitMQ
Для публикации сообщений в очереди всякий раз, когда tweepy получает новый твит.RabbitMQ затем использует это сообщение и отправляет его в WebSocket.
Попытка (что у меня есть на данный момент)
TweetStreamListener
использует API-интерфейс tweepy для прослушивания твитов на основеввод пользователя.Какой бы твит он ни получал, он вычисляет полярность этого твита и публикует его в очереди rabbitMQ twitter_topic_feed
.
import logging
from tweepy import StreamListener, OAuthHandler, Stream, API
from sentiment_analyzer import calculate_polarity_score
from constants import SETTINGS
auth = OAuthHandler(
SETTINGS["TWITTER_CONSUMER_API_KEY"], SETTINGS["TWITTER_CONSUMER_API_SECRET_KEY"])
auth.set_access_token(
SETTINGS["TWITTER_ACCESS_KEY"], SETTINGS["TWITTER_ACCESS_SECRET_KEY"])
api = API(auth, wait_on_rate_limit=True)
class TweetStreamListener(StreamListener):
def __init__(self):
self.api = api
self.stream = Stream(auth=self.api.auth, listener=self)
def start_listening(self):
pass
def on_status(self, status):
if not hasattr(status, 'retweeted_status'):
polarity = calculate_polarity_score(status.text)
message = {
'polarity': polarity,
'timestamp': status.created_at
}
# TODO(Luis) Need to figure who to send this message to.
logging.debug("Message received from Twitter: {0}".format(message))
# limit handling
def on_limit(self, status):
logging.info(
'Limit threshold exceeded. Status code: {0}'.format(status))
def on_timeout(self, status):
logging.error('Stream disconnected. continuing...')
return True # Don't kill the stream
"""
Summary: Callback that executes for any error that may occur. Whenever we get a 420 Error code, we simply
stop streaming tweets as we have reached our rate limit. This is due to making too many requests.
Returns: False if we are sending too many tweets, otherwise return true to keep the stream going.
"""
def on_error(self, status_code):
if status_code == 420:
logging.error(
'Encountered error code 420. Disconnecting the stream')
# returning False in on_data disconnects the stream
return False
else:
logging.error('Encountered error with status code: {}'.format(
status_code))
return True # Don't kill the stream
WS_Handler
отвечает за ведение списка открытых соединений и отправку любого сообщения, которое онполучает обратно каждому клиенту (такое поведение мне не нужно).
import logging
import json
from uuid import uuid4
from tornado.web import RequestHandler
from tornado.websocket import WebSocketHandler
class WSHandler(WebSocketHandler):
def check_origin(self, origin):
return True
@property
def sess_id(self):
return self._sess_id
def open(self):
self._sess_id = uuid4().hex
logging.debug('Connection established.')
self.application.pc.register_websocket(self._sess_id, self)
# When messages arrives via RabbitMQ, write it to websocket
def on_message(self, message):
logging.debug('Message received: {0}'.format(message))
self.application.pc.redirect_incoming_message(
self._sess_id, json.dumps(message))
def on_close(self):
logging.debug('Connection closed.')
self.application.pc.unregister_websocket(self._sess_id)
Модуль PikaClient
содержит PikaClient, который позволяет отслеживать входящие и исходящие каналы, а также отслеживатьвеб-сокетов, которые в данный момент работают.
import logging
import pika
from constants import SETTINGS
from pika import PlainCredentials, ConnectionParameters
from pika.adapters.tornado_connection import TornadoConnection
pika.log = logging.getLogger(__name__)
class PikaClient(object):
INPUT_QUEUE_NAME = 'in_queue'
def __init__(self):
self.connected = False
self.connecting = False
self.connection = None
self.in_channel = None
self.out_channels = {}
self.websockets = {}
def connect(self):
if self.connecting:
return
self.connecting = True
# Setup rabbitMQ connection
credentials = PlainCredentials(
SETTINGS['RABBITMQ_USERNAME'], SETTINGS['RABBITMQ_PASSWORD'])
param = ConnectionParameters(
host=SETTINGS['RABBITMQ_HOST'], port=SETTINGS['RABBITMQ_PORT'], virtual_host='/', credentials=credentials)
return TornadoConnection(param, on_open_callback=self.on_connected)
def run(self):
self.connection = self.connect()
self.connection.ioloop.start()
def stop(self):
self.connected = False
self.connecting = False
self.connection.ioloop.stop()
def on_connected(self, unused_Connection):
self.connected = True
self.in_channel = self.connection.channel(self.on_conn_open)
def on_conn_open(self, channel):
self.in_channel.exchange_declare(
exchange='tornado_input', exchange_type='topic')
channel.queue_declare(
callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME)
def on_input_queue_declare(self, queue):
self.in_channel.queue_bind(
callback=None, exchange='tornado_input', queue=self.INPUT_QUEUE_NAME, routing_key="#")
def register_websocket(self, sess_id, ws):
self.websockets[sess_id] = ws
self.create_out_channel(sess_id)
def unregister_websocket(self, sess_id):
self.websockets.pop(sess_id)
if sess_id in self.out_channels:
self.out_channels[sess_id].close()
def create_out_channel(self, sess_id):
def on_output_channel_creation(channel):
def on_output_queue_declaration(queue):
channel.basic_consume(self.on_message, queue=sess_id)
self.out_channels[sess_id] = channel
channel.queue_declare(callback=on_output_queue_declaration,
queue=sess_id, auto_delete=True, exclusive=True)
self.connection.channel(on_output_channel_creation)
def redirect_incoming_message(self, sess_id, message):
self.in_channel.basic_publish(
exchange='tornado_input', routing_key=sess_id, body=message)
def on_message(self, channel, method, header, body):
sess_id = method.routing_key
if sess_id in self.websockets:
self.websockets[sess_id].write_message(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_reject(delivery_tag=method.delivery_tag)
Server.py
является основной точкой входа приложения.
import logging
import os
from tornado import web, ioloop
from tornado.options import define, options, parse_command_line
from client import PikaClient
from handlers import WSHandler, MainHandler
define("port", default=3000, help="run on the given port.", type=int)
define("debug", default=True, help="run in debug mode.", type=bool)
def main():
parse_command_line()
settings = {
"debug": options.debug,
"static_path": os.path.join(os.path.dirname(__file__), "web/static")
}
app = web.Application(
[
(r"/", MainHandler),
(r"/stream", WSHandler),
],
**settings
)
# Setup PikaClient
app.pc = PikaClient()
app.listen(options.port)
logging.info("Server running on http://localhost:3000")
try:
app.pc.run()
except KeyboardInterrupt:
app.pc.stop()
if __name__ == "__main__":
main()