Я создаю программу в ржавчине (по ночам, async-await, futures-preview, tokio 0.2+) для записи видео с IP-камеры через RTSP.
Модуль записи запускает процесс ffmpeg, который создает фрагменты видео,Он также ловит и регистрирует stderr / stdout в ffmpeg и записывает его на экран. В случае ошибок (сети и т. Д.) Ffmpeg может умереть, поэтому я хочу перезапустить его через 15 секунд бесконечно.
Я бы хотел создать динамическую коллекцию Recorder s . Я хочу добавить / удалить регистраторы во время выполнения. В какой-то момент для каждого созданного Регистратора я должен получить его будущее и передать его tokio::spawn
. С точки зрения черного ящика я хочу, чтобы регистраторы предоставили два метода: add(r: Recorder OR all recorder params) -> RecorderHandle
и remove(rh: RecorderHandle)
.
- Есть ли рекомендуемый шаблон для такого сценария?
- Какя могу удалить рекордер? Как я могу отменить будущее, переданное
tokio::spawn
? - Куда бы упал будущий стоп? На ближайшем
.await
? - Метод
run()
в порядке? Есть ли какие-либо предложения? - Метод
run_infinite()
в порядке? - Есть ли какие-либо рекомендации для бесконечных потоков?
Код регистратора
pub struct RtspRecorder {
rtsp_url: url::Url,
segment_time: std::time::Duration,
output_strftime: String,
output_directory: std::path::PathBuf
}
impl RtspRecorder {
pub fn new( /* just all fields are passed to the struct */ ) -> Self { /* (...) */}
fn ffmpeg_command_build(&self) -> tokio_net::process::Command { /* sets env and command line arguments */ }
pub async fn run(
&self,
) -> Box<dyn std::error::Error> {
let mut command = self.ffmpeg_command_build();
let mut child = command.spawn().unwrap();
let stdout = tokio::codec::FramedRead::new(child.stdout().take().unwrap(), tokio::codec::LinesCodec::default())
.for_each(|message| { log::info!("ffmpeg stdout: {}", message.unwrap()); return futures::future::ready(()); });
let stderr = tokio::codec::FramedRead::new(child.stderr().take().unwrap(), tokio::codec::LinesCodec::default())
.for_each(|message| { log::warn!("ffmpeg stderr: {}", message.unwrap()); return futures::future::ready(()); });
let futures_results = futures::future::join3(child, stdout, stderr).await;
return match futures_results.0 {
Err(e) => e.into(),
Ok(exit_status) => format!("ffmpeg exited with status: {:?}", exit_status).into()
}
}
pub async fn run_infinite(
&self
) {
loop {
let error = self.run().await;
log::warn!("error during run: {:?}, restarting...", error);
tokio_timer::delay_for(std::time::Duration::from_secs(15)).await;
}
}
}