Как правильно остановить отдельный поток вместе с выключением сервера bokeh? - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь построить график в режиме реального времени с потоковой передачей некоторых данных с помощью сервера bokeh. Тем временем я хочу начать отдельный процесс, чтобы «манипулировать» одними и теми же данными с помощью какой-либо операции ввода-вывода файлов на жесткий диск. Из нескольких источников здесь и здесь я смог собрать скрипт, показанный ниже. Однако когда я запускаю bokeh serve --show test.py в командной строке, веб-страница никогда не загружается. И сервер просто заморожен. Может ли кто-нибудь указать, что я должен делать? Спасибо за любую помощь.

Для простоты blocking_task для отдельного потока содержит только функцию time.sleep. Сюжет будет показан, если закомментированы последние три строки (они предназначены для отдельной темы).

from bokeh.io import curdoc
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure
from bokeh.models.widgets import Button
from bokeh.layouts import column, widgetbox

import time
import numpy as np
import datetime as dt
from threading import Thread
import sys

# the function to be called by the separate thread
def blocking_task():
    while True:
        if stop_threads:
            print('thread killed')
            sys.exit()
        time.sleep(0.1)

# the 'stop' button under the plotting
def button_callback():
    global stop_threads
    stop_threads = True


def update():
    data = np.random.rand()
    source.stream(dict(time=[dt.datetime.now()], data=[data]), 100)


doc = curdoc()
stop_threads = False  # a global flag used for stopping the separate thread

source = ColumnDataSource(dict(time=[], data=[]))
fig = figure(x_axis_type='datetime', plot_width=800, plot_height=400)
fig.line(x='time', y='data', source=source)

button = Button(label="Stop", button_type="success")
button.on_click(button_callback)

doc.add_root(column([fig, widgetbox(button, align="center")], sizing_mode='stretch_both'))
doc.add_periodic_callback(callback=update, period_milliseconds=100)

thread = Thread(target=blocking_task)
thread.start()
thread.join()  # I can comment out this line to show the streaming plot, but there is no way to stop the separate thread together with shutting down the server.

Ответы [ 2 ]

1 голос
/ 25 апреля 2020

join() ожидает остановки потока перед продолжением выполнения. bokeh serve оборачивает весь сценарий в функцию, которая должна возвращаться, что никогда не происходит в вашем случае.

Попробуйте удалить только последнюю строку и установить daemon=True в конструкторе Thread.

0 голосов
/ 29 апреля 2020

Я избегу флага демона согласно документации :

Потоки демона внезапно останавливаются при завершении работы. Их ресурсы (такие как открытые файлы, транзакции базы данных и т. Д. c.) Могут быть освобождены неправильно. Если вы хотите, чтобы ваши потоки корректно останавливались, сделайте их не демонами c и используйте подходящий механизм сигнализации, такой как Событие.

Я вместо этого проверю threading.main_thread().is_alive() в то время как l oop дочерней нити

from bokeh.io import curdoc
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure
from bokeh.models.widgets import Button
from bokeh.layouts import column, widgetbox

import time
import numpy as np
import datetime as dt
import threading
import sys


def blocking_task():
    while threading.main_thread().is_alive():
        time.sleep(0.1)
    else:
        print('exiting child thread')


def button_callback():
    sys.exit()


def update():
    data = np.random.rand()
    source.stream(dict(time=[dt.datetime.now()], data=[data]), 100)


doc = curdoc()

source = ColumnDataSource(dict(time=[], data=[]))
fig = figure(x_axis_type='datetime', plot_width=800, plot_height=400)
fig.line(x='time', y='data', source=source)

button = Button(label="Stop", button_type="success")
button.on_click(button_callback)

doc.add_root(column([fig, widgetbox(button, align="center")], sizing_mode='stretch_both'))
doc.add_periodic_callback(callback=update, period_milliseconds=100)

thread = threading.Thread(target=blocking_task)
thread.start()
...