Переполненный многопроцессорный pipe () вызывает задержки между клиентом сокета и клиентом da sh - PullRequest
2 голосов
/ 16 июня 2020

Я передаю последний ответ с моего сервера через клиент на панель управления. И клиент, и панель управления работают как отдельный процесс с использованием multiprocessing. В настоящее время я передаю полученный пакет через pipe(). При отображении панели plotly dash я использую @app.callback с интервалом в 1 секунду.

Это приводит к тому, что канал становится тесным, и мои значения обновляются с огромной задержкой.

Код приборной панели

def dashboard():
    app = dash.Dash()
    app.layout = html.Div([
        dcc.Interval(
            id="interval-components",
            interval=1*1000,
            n_intervals=0
        ),

        # dcc.Graph(figure=fig)
        daq.Gauge(
            id='gauge-chart',
            color={"gradient": True, "ranges": {
                "green": [0, 10], "yellow": [10, 30], "red": [30, 60]}},
            value=0,
            max=60,
            min=0,
            units="M/S",
        )
    ], className='row', style={'textAlign': 'center'})
    @app.callback(
        output=[Output('gauge-chart', 'value')],
        inputs=[Input('interval-components', 'n_intervals')]
    )
    def update_gauge(n):
        value = receive_packet(child_conn)[0][2]
        return [value]

    app.run_server(port=8047,debug=True)

def receive_packet(conn):
    packet = conn.recv()
    packet = list(map(int, packet))
    return [packet]

Многопроцессорная обработка

    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=get_data, args=(parent_conn, address,))
    p2 = multiprocessing.Process(target=dashboard, args=())
    p3 = multiprocessing.Process(target=receive_packet, args=(child_conn, ))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

get_data отправляется в конвейер

#SOME CODE TO RECEIVE THE PACKAGES FROM THE SERVER
while True:
   conn.send([can_id, an1, anVar, connected, time_since_valid])

Есть ли лучший способ обмена данными между процессами без блокировки потока? Может быть, общие состояния?

FAILED FIX 1

Я уже пытался исправить это, позволив процессу get_data time.sleep(1) соответствовать интервалу. Кажется, это не работает.

FAILED FIX 2

Я попытался добавить несколько сообщений Я готов , но это не помогло проблема:

def receive_packet(conn = child_conn):
    conn.send("GIMMI DATA")
    packet = conn.recv()
    conn.send("0")
    packet = list(map(int, packet))
    return [packet]


def get_data(conn = parent_conn):
    #################
    ####SOME CODE####
    #################
    packet = conn.recv()
    if packet == "GIMMI DATA":
        conn.send([can_id, an1, anVar, connected, time_since_valid])
    else:
        pass

1 Ответ

0 голосов
/ 17 июня 2020

УСПЕШНОЕ ИСПРАВЛЕНИЕ 1

Вместо запуска multiprocessing.Pipe() я вернулся к использованию multiprocessing.Value(), что кажется более стабильным для постоянных потоков данных. Всегда приветствуются предложения по более эффективным решениям.

...