Как изменить продолжительность потока таймера Tokio, который был объединен с другим потоком? - PullRequest
2 голосов
/ 05 августа 2020

У меня три потока:

  1. поток таймера (определен ниже)
  2. поток широковещательного канала (определен ниже)
  3. поток WebSocket (не определен здесь, но я не думаю, что это необходимо. Он основан на токио-вольфраме)

Я объединяю эти 3 потока вместе с select.

Мне нужны команды, которые я отправляю канал вещания для изменения таймера - я хочу иметь возможность повторно настроить таймер tokio после объединения потоков.

Это мой поток таймера:

fn timer_stream(dur: u64) -> impl futures::stream::Stream<Item = Input> {
    let task_time = tokio::time::Instant::now();

    tokio::time::interval_at(task_time, Duration::from_secs(dur))
        .map(move |_| Input::Command_first(task_time))
}

Я хочу чтобы иметь возможность изменять task_time и dur.

Вот как я объединяю свои потоки вместе:

let mut websocket_timer = select(exchange_websocket_stream, timer_stream(3));
let mut combined = select(websocket_timer, command_receiver);

command_receiver определяется с использованием tokio::sync::broadcast; как так:

let (tx_tcp_commands, mut rx_tcp_commands) = broadcast::channel::<Input>(16);
command_receiver = tx_tcp_commands.subscribe()

Input это:

enum Input {
    Command_second(tokio::time::Instant),
    Command_first(tokio::time::Instant),
    Thing,
}

Затем я помещаю указанное выше в tokio::main исполнителя и выполняю

loop {
    match combined.next().await {
        None => break,
        Some(Input::Command(t)) => etc,
    }
}

Это работает как общий поток, и я могу сопоставить команды, отправленные на command_receiver, мне просто нужно знать, как изменить параметры таймера.

l oop обрабатывается очень большое количество сообщений WebSocket, поэтому оно должно быть эффективным.

Как я могу sh это сделать?

...