В Python 3.7 я пытаюсь создать веб-сокет Flask-SocketIO, использующий RxPy.
Если я позвоню socketio.run(app)
, то код никогда не доберется до места, где запускаются Observables. Если я вызываю функцию Observable .subscribe()
, то я никогда не получаю функцию socketio.run()
.
Какой питонный способ объединить эти два бесконечных цикла, чтобы оба были активными?
Обновление:
Я создал тему и поместил туда Наблюдаемые. Я не уверен, что это хорошее решение, но оно работает. Есть ли лучший способ?
from gevent import monkey
monkey.patch_all()
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, url_for, copy_current_request_context
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
from rx import Observable, Observer
from rx.concurrency.mainloopscheduler import AsyncIOScheduler
from threading import Lock
thread = None
thread_lock = Lock()
from twitter_credentials import *
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
app.config['DEBUG'] = True
socketio = SocketIO(app)
def background_thread():
tweets_for(topics) \
.map(lambda t: json.loads(t)) \
.subscribe(on_next=lambda t: processTweet(t),
on_error=lambda e: print(e))
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/tweet')
def test_connect():
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
@socketio.on('disconnect', namespace='/tweet')
def test_disconnect():
print('socketio disconnected')
def tweets_for(_topics):
def observe_tweets(observer):
class TweetListener(StreamListener):
def on_data(self, data):
observer.on_next(data)
return True
def on_error(self, status_code):
observer.on_error(status_code)
listener = TweetListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, listener)
stream.filter(track=_topics)
return Observable.create(observe_tweets).share()
class SocketObserver(Observer):
def on_next(self, x):
print(f'SocketObserver.on_next {x}')
def on_completed(self):
print(f'SocketObserver.on_completed')
def on_error(self, error):
print(f'SocketObserver.on_error {error}')
def processTweet(t):
socketio.emit('tweet', {"tweet": json.dumps(t)}, namespace='/tweet')
socket_observable = Observable.from_callback(socketio.on_event)
socket_my_event = socket_observable('tweet')
socket_my_event.subscribe(SocketObserver)
topics = ['RxPy', 'Python', 'ReactiveX']
if __name__ == '__main__':
socketio.run(app)