Как совместить Flask-SocketIO с RxPy - PullRequest
0 голосов
/ 20 мая 2019

В Python 3.7 я пытаюсь создать веб-сокет Flask-SocketIO, использующий RxPy.

Если я позвоню socketio.run(app), то код никогда не доберется до места, где запускаются Observables. Если я вызываю функцию Observable .subscribe(), то я никогда не получаю функцию socketio.run().

Какой питонный способ объединить эти два бесконечных цикла, чтобы оба были активными?

Обновление:

Я создал тему и поместил туда Наблюдаемые. Я не уверен, что это хорошее решение, но оно работает. Есть ли лучший способ?

from gevent import monkey
monkey.patch_all()

from flask_socketio import SocketIO, emit
from flask import Flask, render_template, url_for, copy_current_request_context

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

from rx import Observable, Observer
from rx.concurrency.mainloopscheduler import AsyncIOScheduler

from threading import Lock

thread = None
thread_lock = Lock()

from twitter_credentials import *

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
app.config['DEBUG'] = True

socketio = SocketIO(app)

def background_thread():
    tweets_for(topics) \
        .map(lambda t: json.loads(t)) \
        .subscribe(on_next=lambda t: processTweet(t),
                   on_error=lambda e: print(e))

@app.route('/')
def index():
    return render_template('index.html', async_mode=socketio.async_mode)

@socketio.on('connect', namespace='/tweet')
def test_connect():
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(background_thread)

@socketio.on('disconnect', namespace='/tweet')
def test_disconnect():
    print('socketio disconnected')

def tweets_for(_topics):
    def observe_tweets(observer):
        class TweetListener(StreamListener):

            def on_data(self, data):
                observer.on_next(data)
                return True

            def on_error(self, status_code):
                observer.on_error(status_code)

        listener = TweetListener()
        auth = OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)

        stream = Stream(auth, listener)
        stream.filter(track=_topics)

    return Observable.create(observe_tweets).share()

class SocketObserver(Observer):
    def on_next(self, x):
        print(f'SocketObserver.on_next {x}')
    def on_completed(self):
        print(f'SocketObserver.on_completed')
    def on_error(self, error):
        print(f'SocketObserver.on_error {error}')

def processTweet(t):
    socketio.emit('tweet', {"tweet": json.dumps(t)}, namespace='/tweet')

socket_observable = Observable.from_callback(socketio.on_event)
socket_my_event = socket_observable('tweet')
socket_my_event.subscribe(SocketObserver)

topics = ['RxPy', 'Python', 'ReactiveX']

if __name__ == '__main__':
    socketio.run(app)
...