Не может завершить наблюдаемое - PullRequest
0 голосов
/ 01 октября 2019

Я пишу Async File Reader, работающий с определенным параметром состояния. Мне нужно получить уведомление, когда все файлы были прочитаны, но этот наблюдаемый объект «читатель» никогда не завершается. (не удалось получить уведомление «Готово» после оператора «reader.Wait ()»). Не могли бы вы помочь мне понять, почему? Как я могу завершить это вручную?

class AsyncReader
    {
        public enum States { Processing, Stopped, Paused};

        private Subject<string[]> filesProvider = new Subject<string[]>();

        private Subject<States> state = new Subject<States>();

        public void Run()
        {
            state.OnNext(States.Processing);
        }

        public IObservable<KeyValuePair<string, string>> GetDataSource()
        {
            return filesProvider.Select(files => ReadFiles(files, state)).Switch();
        }

        public AsyncReader(string[] args)
        {
            var reader = GetDataSource();
            Observable.Start(() =>
            {
                reader.Wait();
                Console.WriteLine("Done");
            });
            reader.Subscribe(line =>
            {
                Console.WriteLine(line);
            });
            filesProvider.OnNext(args);
        }

        public static IObservable<KeyValuePair<string, string>> ReadFile(string filePath, IObservable<States> rState) =>      
            rState.Where(state => state == States.Processing)
            .SelectMany(_ =>
            Observable
            .Using(
                () => new StreamReader(filePath),
                reader =>
                    Observable
                        .Defer(
                            () =>
                                Observable
                                    .FromAsync(reader.ReadLineAsync))
                        .Repeat()
                        .TakeWhile(line => line != null)
                        .Select(line => new KeyValuePair<string, string>(filePath, line))));

        public static IObservable<KeyValuePair<string, string>> ReadFiles(string[] files, IObservable<States> readState)
        {
            IObservable<KeyValuePair<string, string>> dataSource = Observable.Empty<KeyValuePair<string, string>>();
            foreach (var file in files)
            {
                dataSource = dataSource.Concat(ReadFile(file, readState));
            }
            return dataSource;
        }
    }

Краткий пример использования:

class Program
    {
        static void Main(string[] args)
        {
            AsyncReader reader = new AsyncReader(args);
            reader.Run();
            Console.ReadKey();
        }
    }
...