Дамп Наблюдаемый <byte []> в поток - PullRequest
0 голосов
/ 11 марта 2019

В настоящее время у меня есть Observable<byte[]>, который на самом деле представляет собой последовательность фрагментов исходного файла, используя метод .

Он "разделяет" поток как последовательностьиз byte[].

Проблема в том, что, учитывая эту последовательность, я хотел бы записать ее в целевой поток.Другими словами, я должен выгружать каждый byte[] в поток файлов до тех пор, пока последовательность не будет завершена, но я также должен ждать, пока последовательность не закончится.

Пока что этот код, который я создал, работает, но яБоюсь, это не правильный способ сделать это способ сделать это.Соответствующей частью, в которой обрабатывается IObservable<byte[]>, является метод Download.

async Task Main()
{
    using (var httpClient = new HttpClient()) 
    {
        var downloader = new HttpDownloader(httpClient);
        var destinationPath = Path.Combine(Path.GetTempPath(), "test.zip");
        await downloader.Download("https://github.com/gus33000/MSM8994-8992-NT-ARM64-Drivers/archive/master.zip", destinationPath);
        Console.WriteLine("File downloaded to " + destinationPath);
    }   
}

public class HttpDownloader
{
    private readonly HttpClient client;

    public HttpDownloader(HttpClient client)
    {
        this.client = client;
    }

    public async Task Download(string url, string path, IDownloadProgress progressObserver = null, int timeout = 30)
    {
        using (var fileStream = File.OpenWrite(path))
        {
            await Download(url, fileStream, progressObserver, timeout);
        }
    }

    private async Task Download(string url, Stream destination, IDownloadProgress progressObserver = null,
        int timeout = 30)
    {
        long? totalBytes = 0;
        long bytesWritten = 0;

        await ObservableMixin.Using(() => client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead),
                s =>
                {
                    totalBytes = s.Content.Headers.ContentLength;
                    if (!totalBytes.HasValue)
                    {
                        progressObserver?.Percentage.OnNext(double.PositiveInfinity);
                    }
                    return ObservableMixin.Using(() => s.Content.ReadAsStreamAsync(),
                        contentStream => contentStream.ReadToEndObservable());
                })
            .Do(bytes =>
            {
                bytesWritten += bytes.Length;
                if (totalBytes.HasValue)
                {
                    progressObserver?.Percentage.OnNext((double)bytesWritten / totalBytes.Value);
                }

                progressObserver?.BytesDownloaded?.OnNext(bytesWritten);
            })
            .Timeout(TimeSpan.FromSeconds(timeout))
            .Select(bytes => Observable.FromAsync(async () =>
            {
                await destination.WriteAsync(bytes, 0, bytes.Length);
                return Unit.Default;
            }))
            .Merge(1);
    }

    private static readonly int BufferSize = 8192;

    public async Task<Stream> GetStream(string url, IDownloadProgress progress = null, int timeout = 30)
    {
        var tmpFile = Path.Combine(Path.GetTempPath(), Path.GetTempFileName());
        var stream = File.Create(tmpFile, BufferSize, FileOptions.DeleteOnClose);

        await Download(url, stream, progress, timeout);
        return stream;
    }
}

public interface IDownloadProgress
{
    ISubject<double> Percentage { get; set; }
    ISubject<long> BytesDownloaded { get; set; }
}

public static class ObservableMixin
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}

public static class StreamExtensions
{
    internal const int defaultBufferSize = 4096;

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream)
    {
        return stream.ReadToEndObservable(new byte[defaultBufferSize]);
    }

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    {
        return stream.ReadToEndObservable(new byte[bufferSize]);
    }

    internal static IObservable<byte[]> ReadToEndObservable(this Stream stream, byte[] buffer)
    {
        return Observable.Create<byte[]>(
            observer =>
            {
                var subscription = new SerialDisposable();

                return new CompositeDisposable(
                    subscription,
                    Scheduler.Immediate.Schedule(
                        self =>
                        {
                            bool continueReading = true;

                            subscription.SetDisposableIndirectly(() =>
                                stream.ReadObservable(buffer).SubscribeSafe(
                                    data =>
                                    {
                                        if (data.Length > 0)
                                        {
                                            observer.OnNext(data);
                                        }
                                        else
                                        {
                                            continueReading = false;
                                        }
                                    },
                                    observer.OnError,
                                    () =>
                                    {
                                        if (continueReading)
                                        {
                                            self();
                                        }
                                        else
                                        {
                                            observer.OnCompleted();
                                        }
                                    }));
                        }));
            });
    }

    internal static IObservable<byte[]> ReadObservable(this Stream stream, byte[] buffer)
    {
        return stream.ReadObservable(buffer, 0, buffer.Length).Select(
            read =>
            {
                byte[] data;

                if (read <= 0)
                {
                    data = new byte[0];
                }
                else if (read == buffer.Length)
                {
                    data = (byte[])buffer.Clone();
                }
                else
                {
                    data = new byte[read];

                    Array.Copy(buffer, data, read);
                }

                return data;
            });
    }

    public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer, int offset, int count)
    {
        return Observable.StartAsync(cancel => stream.ReadAsync(buffer, offset, count, cancel));
    }   
}

public static class SerialDisposableExtensions
{
    public static void SetDisposableIndirectly(this SerialDisposable disposable, Func<IDisposable> factory)
    {
        var indirection = new SingleAssignmentDisposable();

        disposable.Disposable = indirection;

        indirection.Disposable = factory();
    }
}


public static class SafeObservableExtensions
{
    public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
        Action<Exception> onError, Action onCompleted)
    {
        return source.SubscribeSafe(Observer.Create<T>(onNext, onError, onCompleted));
    }
}

Выглядит ли это нормально?

1 Ответ

1 голос
/ 14 марта 2019

Сначала я думал, что у вашего ReadToEndObservable, должно быть, была ошибка, поэтому я написал это вместо:

public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    =>
        Observable.Defer<byte[]>(() =>
        {
            var bytesRead = -1;
            var bytes = new byte[bufferSize];
            return
                Observable.While<byte[]>(
                    () => bytesRead != 0,
                    Observable
                        .FromAsync(() => stream.ReadAsync(bytes, 0, bufferSize))
                        .Do(x =>
                        {
                            bytesRead = x;
                        })
                        .Select(x => bytes.Take(x).ToArray()));
        });

Кажется, он все еще не работает.

Затем я попробовал это с простым кодом:

IObservable<byte[]> test1 =
    Observable
        .Using(
            () => File.Open(@"{path}\HttpDownloader-master\HttpDownloader-master\HttpDownloader.sln", FileMode.Open),
            s => s.ReadToEndObservable(24));

И это сработало с моим кодом. И я попробовал это с твоим. Это сработало.

Я подумал, что может быть что-то не так с потоком, который вы пытаетесь загрузить. Как таковой проблемы не было - просто размер файла составляет 555 МБ.

Я думаю, что ваш код в порядке, но размер был слишком велик, и время истекло.

...