Flask Socketio - асинхронный фоновый поток, управляемый глобальным параметром соответственно - PullRequest
0 голосов
/ 07 октября 2019

Я беру исходный код автора https://github.com/miguelgrinberg/Flask-SocketIO/tree/master/example в app.py и пытаюсь построить два фоновых процесса в FlaskSocketio.

i) Сервер сгенерировал 'бездействие', которое уже идет сэтот код, который пингует каждые 10 секунд (и работает).

ii) У меня есть код, который работает со сторожевым таймером, чтобы увидеть, изменяются ли файлы (log.txt) внутри каталога. И когда это нужно, мне нужно обновить его на предварительном теге (например, динамический журнал, когда сервер выполняет тяжелые вычисления).

Стратегия, которую я разработал, заключалась в использовании словаря глобальных переменных из этих двух потоков t1. , t2 и передать его в поток socketio, т.е. background_process (и с попыткой даже сделать все в одном процессе).

Обыскал и прочитал много форумов, но такого рода функциональность не была обнаружена в доступных приложениях.

#import libs
#python builtins
import os
import datetime
from datetime import timedelta
#work with processes
import psutil
#work with files
import shutil
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
#socketio
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import logging
#multithreads
import threading
from threading import Thread, Event, Lock
#implement watchfiles
import sys
import time
from time import sleep
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
#events
import events
#for the uploads
import urllib.request

########## Related to Thread i ################
def fn_counter():
    global dict_emit
    count = 0
    while True:
        time.sleep(10)
        count += 1
        dict_emit = {'data': 'Server generated event', 'count': count}
######################################
##########Related to thread ii##########
#Classes
class Watcher:
    def __init__(self):
        self.observer = Observer()

    def run(self, str_watchdir):
        logger.info('=> running background thread for file watchdog in '+INSTANCE_DIR)
        event_handler = Handler()
        self.observer.schedule(event_handler, str_watchdir, recursive=True)
        self.observer.start()
        try:
            while True:
                time.sleep(5)
        except:
            self.observer.stop()
            logger.info("Error")
        self.observer.join()

class Handler(FileSystemEventHandler):
    @staticmethod
    def on_any_event(event):
        global dict_emit
        global lst_clients
        #logger.info('=> on_any_event') works well
        #some available events
        #event.event_type == 'created' when file created
        #event.event_type == 'deleted' when file deleted 
        #event.event_type == 'moved' when file renamed
        #event.event_type == 'modified' when file contents changed
        if event.is_directory:
            return None
        elif event.event_type == 'modified':
            # Take any action here when a file is first created.
            logger.info('file updated')#+" "+os.path.basename(event.src_path)
            dict_emit = {'data': 'Server generated event', 'file updated': os.path.basename(event.src_path)}
####################

async_mode = None

#vars
#root
#  |-templates
#      |-index.html
#  |-static
#      |-sessions
#          |-instance1
#               |-log.txt
#          |-...
#      |-assets
#          |-log.txt

ROOT_DIR = os.getcwd()                               
TEMPLATES_DIR = os.path.join(ROOT_DIR, 'templates')  
STATIC_DIR = os.path.join(ROOT_DIR, 'static')             
SESSIONS_DIR = os.path.join(STATIC_DIR,'sessions')   
INSTANCE_DIR = os.path.join(SESSIONS_DIR, datetime.datetime.now().strftime('%d-%m-%Y_%H-%M-%S')) 
os.makedirs(INSTANCE_DIR)
shutil.copyfile(os.path.join(ASSETS_DIR, "log.txt"),
                os.path.join(INSTANCE_DIR, "log.txt"))
#Utilities
def background_thread():
    global dict_emit
    """Example of how to send server generated events to clients."""
    while True:
        time.sleep(4)
        if dict_emit:
            socketio.emit('my_response',
                          dict_emit, #{'data': 'Server generated event', 'count': count},
                          namespace='/test')
            dict_emit={}

app = Flask(__name__)
app.config.update(
    #built-in parameters
    APPLICATION_ROOT ='/', #default is '/'
    SECRET_KEY="secret_key",  #Default is None or app.config['SECRET_KEY']="secret key" 
    MAX_CONTENT_LENGTH= 160 * 1024 * 1024,  #is defaults None or app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 for 160 MB
    TEMPLATES_AUTO_RELOAD = False, #defaults None
    ENV='Production', #default 'Production'
    EXPLAIN_TEMPLATE_LOADING=True, #defaults False
    DEBUG=False, #defaults True if ENV='development'
    TESTING=False, #default False
    PROPAGATE_EXCEPTIONS=None, #default None
    PRESERVE_CONTEXT_ON_EXCEPTION=None, #defaults True if Debug True
    TRAP_HTTP_EXCEPTIONS=None, #Default False
    TRAP_BAD_REQUEST_ERRORS =None, #Defaults None
    SESSION_COOKIE_NAME='session', #Defaults session
    SESSION_COOKIE_DOMAIN=None, #defaults None
    SESSION_COOKIE_PATH=None, #defaults None
    #customized parameters
    UPLOAD_FOLDER= INSTANCE_DIR
)

#socketio
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()
lst_clients=[] #with this accessing client sid it's possible to manually emit for clients whereas otherwise only is possible from server events of app context only
#configure logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def background_thread():
    global dict_emit
    logger.info("=> Running SocketIO background_thread in 3rd background thread.")
    """Example of how to send server generated events to clients."""
    while True:
        time.sleep(4)
        if dict_emit:
            socketio.emit('my_response',
                          dict_emit, #{'data': 'Server generated event', 'count': count},
                          namespace='/test')
            dict_emit={}

@app.route('/')
def index():
    return render_template('index.html', async_mode=socketio.async_mode)

@socketio.on('connect', namespace='/test')
def test_connect():
    global thread
    global lst_clients
    logger.info(' Client connected ' + request.sid)
    lst_clients.append(request.sid)
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(background_thread)
    emit('my_response', {'data': 'Connected', 'count': 0})

if __name__ == '__main__':
    global t1
    global t2
    #threads share same namespace on variables
    #launch filewatchdog
    w1=Watcher()
    # creating threads 
    t1 = threading.Thread(target=w1.run, args=(INSTANCE_DIR,)) 
    # starting thread 1 
    t1.start() 
    logger.info("=> Running watchdog for " + INSTANCE_DIR + " in 1st background thread.")

    #launch counter
    t2 = threading.Thread(target=fn_counter)
    t2.start()
    logger.info("=> Running counter in 2nd background thread.")

    logger.info('=> Running socket IO in main thread.')
    socketio.run(app, 
                host='localhost',
                 port=10000, 
                 debug=False) #True sends some exceptions and stops)
<!DOCTYPE HTML>
<html>
    <head>
<script src="//code.jquery.com/jquery-1.12.4.min.js" integrity="sha256-ZosEbRLbNQzLpnKIkEdrPv7lOy9C27hHQ+Xp8a4MxAQ=" crossorigin="anonymous"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/2.2.0/socket.io.js" integrity="sha256-yr4fRk/GU1ehYJPAs8P4JlTgu0Hdsp4ZKrx8bDEDC3I=" crossorigin="anonymous"></script>
<script type="text/javascript" charset="utf-8">
    $(document).ready(function() {
        // Use a "/test" namespace.
        // An application can open a connection on multiple namespaces, and
        // Socket.IO will multiplex all those connections on a single
        // physical channel. If you don't care about multiple channels, you
        // can set the namespace to an empty string.
        namespace = '/test';

        // Connect to the Socket.IO server.
        // The connection URL has the following format, relative to the current page:
        //     http[s]://<domain>:<port>[/<namespace>]
        var socket = io(namespace);

        // Event handler for new connections.
        // The callback function is invoked when a connection with the
        // server is established.
        socket.on('connect', function() {
            socket.emit('my_event', {data: 'I\'m connected!'});
        });

// Event handler for server sent data.
        // The callback function is invoked whenever the server emits data
        // to the client. The data is then displayed in the "Received"
        // section of the page.
        socket.on('my_response', function(msg, cb) {
            $('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
            if (cb)
                cb();
        });
    });
</script>
</head>
</html>

Не добавлена ​​функция, которая изменяет содержимое INSTANCE_DIR log.txt, хотя это легко сделать вручную, когда сервер работает, просто перейдите в эту папку, откройте вручную log.txt changeзатем сохранить.

Будет запускаться событие обновления файла.

Другие идеи, которые были проверены:

-> переместить t1 и t2 в background_thread (), это очень сильно увеличивает задержку

-> перемещение t1 и t2 в def test_connect () не обновляется, хотя сервер показывает веб-страницы.

Cheers

...