Я стремлюсь обновлять данные на панели управления (plotly-dash
) в реальном времени из потока данных, который я получаю через multiprocessing pipe
. Однако обратный вызов значения не обновляется, получая:
Traceback (most recent call last):
File "/home/----/Documents/GitHub/----/CLIENT/venv/VISUALIZATION.py", line 146, in update_gauge
packet = pipe.recv()
AttributeError: 'int' object has no attribute 'recv'
Я запускаю три разных процесса. P1 - это get_data, где сокет открывается для сервера, предоставляющего данные. P2 должен быть приборной панелью. P3 - это отладочный процесс, выполняющийся только для того, чтобы увидеть, получает ли клиент какие-либо данные (а это и есть):
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=get_data, args=(parent_conn, address,))
p2 = multiprocessing.Process(target=dashboard, args=(child_conn, ))
p3 = multiprocessing.Process(target=print_pipe, args=(child_conn, ))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
P1 отправляет, независимо от состояния соединения с сервером, всегда отслеживая данные по соединительная труба: conn.send([id, an1, anVar, connected, time_since_valid])
, например ['11', '24', '6', True, 4.76837158203125e-07]
Под ней находится панель приборов, которую я написал, где, я думаю, что-то не так в @app.callback
/ update_gauge
?
####DASHBOARD####
def dashboard(conn):
app = dash.Dash()
app.layout = html.Div([
dcc.Interval(
id="interval-components",
interval=1*10000,
n_intervals=0
),
daq.Gauge(
id='gauge-chart',
color={"gradient": True, "ranges": {
"green": [0, 10], "yellow": [10, 30], "red": [30, 60]}},
value=0,
max=60,
min=0,
units="M/S",
)
], className='row', style={'textAlign': 'center'})
@app.callback(
output=[Output('gauge-chart', 'value')],
inputs=[Input('interval-components', 'n_intervals')]
)
def update_gauge(pipe=conn):
packet = pipe.recv()
value = packet[1]
return value
app.run_server(port=8042,debug=True)
СМ. ВЕСЬ КОД НИЖЕ:
#GENERAL INPUTS
import sys
import multiprocessing
#CONNECTION INPUTS
import socket
import time
#DASHBOARD INPUTS
import dash
import dash_core_components as dcc
import dash_html_components as html
#import dash_bootstrap_components as dbc
import dash_daq as daq
import plotly
import plotly.graph_objects as go
from dash.dependencies import Input, Output
#INITIALIZE VARIABLES
can_id = '0'
an1 = '0'
anVar = '0'
connection_status = '\033[91m' + 'DISCONNECTED' + '\033[0m'
time_since_valid = 0
connected = False
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
####DATA####
def parse_data(data, last_data):
if (len(data)>0 and data[0] == 'A' and data[-1] == 'Z'):
valid = True
return [data[2:-2],valid] #REMOVE START AND END MARKERS WITH SPACES
else:
valid = False
return [last_data,valid]
def try_connect(address):
global connected
global client
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(2)
try:
client.connect(address)
connected = True
except:
connected = False
client.close()
finally:
return [client, connected]
def get_data(conn, address):
#GLOBAL VARIABLE INIT
global can_id
global an1
global anVar
global connection_status
global time_since_valid
global connected
global client
#LOCAL VARIABLES INIT
connection_status = '\033[91m' + 'DISCONNECTED' + '\033[0m'
data = b''
last_data = "START-UP"
t0 = time.time()
#CONNECT
while not connected:
conn.send([can_id, an1, anVar, connected, time_since_valid])
try:
[client, connected] = try_connect(address)
t1 = time.time()
time_since_valid = t1 - t0
except:
pass
#GET DATA
while True:
try:
data = client.recv(14)
except:
connected = False
if not connected:
try:
[client, connected] = try_connect(address)
data = client.recv(14)
except:
client.close()
time_since_valid = time.time() - t0
else:
if data == b'':
connected = False
valid = False
elif data == b'HELLO NEW SOCK':
connected = True
valid = True
else:
connected = True
try:
data = data.decode('utf-8').rstrip('\x00')
except:
pass
[data,valid] = parse_data(data, last_data)
last_data = data
data_list = data.split(' ')
can_id = data_list[0]
an1 = data_list[1]
anVar = data_list[2]
if valid:
t0 = time.time()
t1 = time.time()
else:
t1 = time.time()
time_since_valid = t1 - t0
conn.send([can_id, an1, anVar, connected, time_since_valid])
####DASHBOARD####
def dashboard(conn):
# fig = go.Figure(go.Indicator(
# domain={'x': [0, 1], 'y': [0, 1]},
# value=15,
# mode="gauge+number",
# title={'text': "Windspeed in m/s"},
# gauge={'axis': {'range': [None, 60]},
# 'steps': [
# {'range': [0, 20], 'color': "palegreen"},
# {'range': [20, 40], 'color': "cornsilk"},
# {'range': [40, 60], 'color': "coral"}],
# }))
app = dash.Dash()
app.layout = html.Div([
dcc.Interval(
id="interval-components",
interval=1*10000,
n_intervals=0
),
# dcc.Graph(figure=fig)
daq.Gauge(
id='gauge-chart',
color={"gradient": True, "ranges": {
"green": [0, 10], "yellow": [10, 30], "red": [30, 60]}},
value=0,
max=60,
min=0,
units="M/S",
)
], className='row', style={'textAlign': 'center'})
@app.callback(
output=[Output('gauge-chart', 'value')],
inputs=[Input('interval-components', 'n_intervals')]
)
def update_gauge(pipe=conn):
packet = pipe.recv()
value = packet[1]
return value
app.run_server(port=8043,debug=True)
def print_pipe(conn):
while True:
packet = conn.recv()
print("\r" + str(packet[1]), end="")
if __name__ == '__main__':
# SETTINGS
settings = open('settings.txt', "r")
for line in settings.readlines():
line = line.strip("\n")
line = line.split(" = ")
if line[0] == "IP":
ip = socket.gethostbyname(line[1])
elif line[0] == "PORT":
port = int(line[1])
elif line[0] == "VERSION":
version = str(line[1])
else:
print("Unknown setting: " + str(line[0]))
address = (ip, port)
print(address)
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=get_data, args=(parent_conn, address,))
p2 = multiprocessing.Process(target=dashboard, args=(child_conn, ))
p3 = multiprocessing.Process(target=print_pipe, args=(child_conn, ))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()