Отмена будущего / потока / задачи, обработка будущих коллекций - PullRequest
2 голосов
/ 05 октября 2019

Я создаю программу в ржавчине (по ночам, async-await, futures-preview, tokio 0.2+) для записи видео с IP-камеры через RTSP.

Модуль записи запускает процесс ffmpeg, который создает фрагменты видео,Он также ловит и регистрирует stderr / stdout в ffmpeg и записывает его на экран. В случае ошибок (сети и т. Д.) Ffmpeg может умереть, поэтому я хочу перезапустить его через 15 секунд бесконечно.

Я бы хотел создать динамическую коллекцию Recorder s . Я хочу добавить / удалить регистраторы во время выполнения. В какой-то момент для каждого созданного Регистратора я должен получить его будущее и передать его tokio::spawn. С точки зрения черного ящика я хочу, чтобы регистраторы предоставили два метода: add(r: Recorder OR all recorder params) -> RecorderHandle и remove(rh: RecorderHandle).

  • Есть ли рекомендуемый шаблон для такого сценария?
  • Какя могу удалить рекордер? Как я могу отменить будущее, переданное tokio::spawn?
  • Куда бы упал будущий стоп? На ближайшем .await?
  • Метод run() в порядке? Есть ли какие-либо предложения?
  • Метод run_infinite() в порядке?
  • Есть ли какие-либо рекомендации для бесконечных потоков?

Код регистратора

pub struct RtspRecorder {
    rtsp_url: url::Url,
    segment_time: std::time::Duration,
    output_strftime: String,
    output_directory: std::path::PathBuf
}

impl RtspRecorder {
    pub fn new( /* just all fields are passed to the struct */ ) -> Self { /* (...) */}
    fn ffmpeg_command_build(&self) -> tokio_net::process::Command { /* sets env and command line arguments */ }

    pub async fn run(
        &self,
    ) -> Box<dyn std::error::Error> {
        let mut command = self.ffmpeg_command_build();
        let mut child = command.spawn().unwrap();

        let stdout = tokio::codec::FramedRead::new(child.stdout().take().unwrap(), tokio::codec::LinesCodec::default())
            .for_each(|message| { log::info!("ffmpeg stdout: {}", message.unwrap()); return futures::future::ready(()); });

        let stderr = tokio::codec::FramedRead::new(child.stderr().take().unwrap(), tokio::codec::LinesCodec::default())
            .for_each(|message| { log::warn!("ffmpeg stderr: {}", message.unwrap()); return futures::future::ready(()); });

        let futures_results = futures::future::join3(child, stdout, stderr).await;

        return match futures_results.0 {
            Err(e) => e.into(),
            Ok(exit_status) => format!("ffmpeg exited with status: {:?}", exit_status).into()
        }
    }

    pub async fn run_infinite(
        &self
    ) {
        loop {
            let error = self.run().await;
            log::warn!("error during run: {:?}, restarting...", error);
            tokio_timer::delay_for(std::time::Duration::from_secs(15)).await;
        }
    }
}
...