Я пытаюсь получить «живой сюжет» с использованием кафки и боке, но сталкиваюсь со следующим противоречием:
- Рекомендованный Bokeh 'live plot' рекомендует НЕ использовать push_session () или loop_until_closed (), см. здесь
- Рекомендуемый метод использует add_root () и add_periodic_callback. Смотрите этот пример SO здесь
- Новейший потребительский объект KafkaConsumer (в отличие от устаревшего SimpleConsumer) является итератором, и вы можете перебирать его. Такой цикл над итератором будет блокироваться, пока в теме не произойдет новое событие.
Для справки о полуобработанном растворе с использованием 1 выше см. Этот github
Мне не удалось заставить живой сюжет работать против моей темы кафки, используя 2 и 3 выше:
- KafkaConsumer будет блокировать, пока в теме не появятся новые значения, и это (я полагаю, предотвращает обновление формы моего графика боке, используя приведенный ниже код)
- KafkaConsumer имеет аргумент consumer_timeout_ms , который позволит мне выйти из цикла, но после выполнения обратного вызова bokeh выдает ошибки NoneType, как если бы метод обновления исчез.
- Конечным результатом всего этого является то, что я получаю страницу только с осями, но без линий (движущиеся или иные) или пустую страницу в зависимости от реализации.
Полагаю, я могу попытаться использовать метод «ВЫСОКОГО ОБЕСПЕЧЕНИЯ», но я бы хотел исправить это для предлагаемых преимуществ и расстаться с любопытством. Спасибо!
from kafka import KafkaConsumer
import json
from datetime import datetime
from bokeh.io import show
from bokeh.plotting import figure, curdoc
from bokeh.layouts import column, gridplot
from bokeh.driving import linear
from bokeh.client import push_session
import random
import pandas as pd
p1 = figure(plot_width=1000, plot_height=200, tools="reset")
p2 = figure(plot_width=1000, plot_height=200, x_range=p1.x_range)
p1.xaxis.visible = False
df = pd.DataFrame(columns=['ts', 'VAR1', 'VAR2'])
# data vars
time, value = [0], [0]
# figure that is updated with new data
r1 = p1.line(time, value)
r2 = p2.line(time, value)
#dots = p.circle(time, value, size=1, color='navy')
WINDOW_SIZE = 20
def update(consumer, df):
for message in consumer:
# parse the message
result = json.loads(message.value)
t = datetime.strptime(result['ts'], "%Y-%m-%dT%H:%M:%S.%fZ")
wob = float(result['VAR1'])
rop = float(result['VAR2'])
# add to the dataframe
df.loc[len(df)] = [t, var1, var2]
if len(df['WOB']) > WINDOW_SIZE:
r1.data_source.data['y'] = list(df['VAR1'])[-WINDOW_SIZE:]
r1.data_source.data['x'] = range(len(list(df['VAR1'])))[-WINDOW_SIZE:]
r2.data_source.data['y'] = list(df['VAR2'])[-WINDOW_SIZE:]
r2.data_source.data['x'] = range(len(list(df['VAR2'])))[-WINDOW_SIZE:]
r1.data_source.trigger('data', r1.data_source.data, r1.data_source.data)
r2.data_source.trigger('data', r2.data_source.data, r2.data_source.data)
else:
r1.data_source.data['y'] = list(df['VAR1'])
r1.data_source.data['x'] = range(len(list(df['VAR1'])))
r2.data_source.data['y'] = list(df['VAR2'])
r2.data_source.data['x'] = range(len(list(df['VAR2'])))
r1.data_source.trigger('data', r1.data_source.data, r1.data_source.data)
r2.data_source.trigger('data', r2.data_source.data, r2.data_source.data)
pass
consumer = KafkaConsumer('mnemonics', bootstrap_servers='10.1.0.17:6667', consumer_timeout_ms=2000)
# THIS WORKS ('bokeh serve' and 'python test.py') WITHOUT KAFKA (would need to modify code above)- but 'HIGHLY DISCOURAGED'
# session = push_session(curdoc())
# curdoc().add_periodic_callback(update(consumer, df), 500)
# session.show(gridplot([[p1], [p2]])) # open the document in a browser
# session.loop_until_closed() # run forever
# THIS WORKS ('bokeh serve' and 'python test.py) WITH KAFKA above- but 'HIGHLY DISCOURAGED'
session = push_session(curdoc())
session.show(gridplot([[p1], [p2]])) # open the document in a browser
while 1:
update(consumer, df)
# THIS DOES NOT WORK ('bokeh serve --show test.py) -- just shows axes and nothing else and 'NoneType object is not callable' errors
#curdoc().add_root(gridplot([[p1], [p2]]))
#curdoc().add_periodic_callback(update(consumer, df), 500)
#curdoc().title = 'test'
# THIS DOES NOT WORK ('bokeh serve --show test.py) -- just blank page
#show(gridplot([[p1], [p2]]))
#curdoc().add_root(gridplot([[p1], [p2]]))
#curdoc().add_periodic_callback(update, 500)
#curdoc().title = 'test'
#while 1:
# update(consumer, df)