У меня три потока:
- поток таймера (определен ниже)
- поток широковещательного канала (определен ниже)
- поток WebSocket (не определен здесь, но я не думаю, что это необходимо. Он основан на токио-вольфраме)
Я объединяю эти 3 потока вместе с select
.
Мне нужны команды, которые я отправляю канал вещания для изменения таймера - я хочу иметь возможность повторно настроить таймер tokio после объединения потоков.
Это мой поток таймера:
fn timer_stream(dur: u64) -> impl futures::stream::Stream<Item = Input> {
let task_time = tokio::time::Instant::now();
tokio::time::interval_at(task_time, Duration::from_secs(dur))
.map(move |_| Input::Command_first(task_time))
}
Я хочу чтобы иметь возможность изменять task_time
и dur
.
Вот как я объединяю свои потоки вместе:
let mut websocket_timer = select(exchange_websocket_stream, timer_stream(3));
let mut combined = select(websocket_timer, command_receiver);
command_receiver
определяется с использованием tokio::sync::broadcast;
как так:
let (tx_tcp_commands, mut rx_tcp_commands) = broadcast::channel::<Input>(16);
command_receiver = tx_tcp_commands.subscribe()
Input
это:
enum Input {
Command_second(tokio::time::Instant),
Command_first(tokio::time::Instant),
Thing,
}
Затем я помещаю указанное выше в tokio::main
исполнителя и выполняю
loop {
match combined.next().await {
None => break,
Some(Input::Command(t)) => etc,
}
}
Это работает как общий поток, и я могу сопоставить команды, отправленные на command_receiver
, мне просто нужно знать, как изменить параметры таймера.
l oop обрабатывается очень большое количество сообщений WebSocket, поэтому оно должно быть эффективным.
Как я могу sh это сделать?