Преобразование основанного на событиях API в Rx.Net - PullRequest
0 голосов
/ 23 июня 2018

Я пытаюсь преобразовать существующий API на основе событий в Reactive Observable API. Конкретный API, с которым я работаю, это NSNetServiceBrowser в Xamarin.iOS. Этот API позволяет просматривать сетевые устройства с помощью Zeroconf / Bonjour. Однако этот вопрос будет применяться к любому API такого рода.

NsNetServiceBrowser предлагает различные интересные события: - FoundService - NotSearched - ServiceRemoved

Событие FoundService возникает при обнаружении службы, а NotSearched возникает при сбое поиска.

Я хотел бы объединить события FoundService и NotSerched в наблюдаемую величину NSNetService.

Моя текущая реализация выглядит так:

public IObservable<NSNetService> Search()
{
    var foundObservable = Observable
        .FromEventPattern<NSNetServiceEventArgs>(
            h => serviceBrowser.FoundService += h,
            h => serviceBrowser.FoundService -= h)
        .Select(x => x.EventArgs);

    var notSearchedObservable = Observable
        .FromEventPattern<NSNetServiceErrorEventArgs>(
            h => serviceBrowser.NotSearched += h,
            h => serviceBrowser.NotSearched -= h)
        .Select(x => x.EventArgs);

    var serviceObservable = Observable.Create(
        (IObserver<NSNetServiceEventArgs> observer) =>
        {
            notSearchedObservable.Subscribe(n =>
            {
                string errorMessage = $"Search for {serviceType} failed:";
                foreach (var kv in n.Errors)
                {
                    log.Error($"\t{kv.Key}: {kv.Value}");
                    errorMessage += $" ({kv.Key}, {kv.Value})";
                }
                observer.OnError(new Exception(errorMessage));
            });
            foundObservable.Subscribe(observer);
            return System.Reactive.Disposables.Disposable.Empty;
    }).Select(x => x.Service);

    serviceBrowser.SearchForServices(serviceType, domain);
    return serviceObservable;
}

Код выглядит неуклюже, и у меня есть ощущение, что я неправильно использую System.Reactive? Есть ли более элегантный способ комбинировать пары событий, когда одно генерирует, а другое сигнализирует об ошибке? Это общий шаблон в существующих API на основе событий в .NET.


Вот небольшое консольное приложение (зависит только от System.Reactive), иллюстрирующее тип API, который я хочу повторно активировать:

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace ReactiveLearning
{
    class Program
    {
        static void Main(string[] args)
        {
            var browser = new ServiceBrowser();

            var observableFound =
                Observable.FromEventPattern<ServiceFoundEventArgs>(
                    h => browser.ServiceFound += h,
                    h => browser.ServiceFound -= h)
                .Select(e => e.EventArgs.Service);

            var observableError =
                Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
                    h => browser.ServiceError += h,
                    h => browser.ServiceError -= h);

            var foundSub = observableFound.Subscribe(s =>
            {
                Console.WriteLine($"Found service: {s.Name}");
            }, () => 
            {
                Console.WriteLine("Found Completed");
            });

            var errorSub = observableError.Subscribe(e =>
            {
                Console.WriteLine("ERROR!");
            }, () => 
            {
                Console.WriteLine("Error Completed");
            });

            browser.Search();

            Console.ReadLine();

            foundSub.Dispose();
            errorSub.Dispose();

            Console.WriteLine();
        }
    }

    class ServiceBrowser
    {
        public EventHandler<ServiceFoundEventArgs> ServiceFound;
        public EventHandler<ServiceSearchErrorEventArgs> ServiceError;

        public void Search()
        {
            Task.Run(async () =>
            {
                for (var i = 0; i < 5; ++i)
                {
                    await Task.Delay(1000);
                    ServiceFound?.Invoke(this, new ServiceFoundEventArgs(new Service($"Service {i}")));
                }

                var r = new Random();
                if (r.NextDouble() > 0.5)
                {
                    ServiceError?.Invoke(this, new ServiceSearchErrorEventArgs());
                }
            });
        }
    }

    class ServiceFoundEventArgs : EventArgs
    {
        public Service Service { get; private set;  }
        public ServiceFoundEventArgs(Service service) => Service = service;
    }

    class ServiceSearchErrorEventArgs : EventArgs {}

    class Service
    {
        public event EventHandler<EventArgs> AddressResolved;
        public event EventHandler<EventArgs> ErrorResolvingAddress;
        public string Name { get; private set; }
        public string Address { get; private set; }
        public Service(string name) => Name = name;

        public void ResolveAddress()
        {
            Task.Run(async () =>
            {
                await Task.Delay(500);
                var r = new Random();
                if (r.NextDouble() > 0.5)
                {                    
                    Address = $"http://{Name}.com";
                    AddressResolved?.Invoke(this, EventArgs.Empty);
                }
                else
                {
                    ErrorResolvingAddress?.Invoke(this, EventArgs.Empty);
                }
            });
        }
    }
}

1 Ответ

0 голосов
/ 24 июня 2018

Спасибо за отличный пример кода.Вам нужно воспользоваться превосходными Materialize & Dematerialize операторами.Вот как это делается:

var observableFoundWithError =
    observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x =>
                    Notification
                        .CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

using (observableFoundWithError.Subscribe(
    s => Console.WriteLine($"Found service: {s.Name}"),
    ex => Console.WriteLine($"Found error: {ex.Message}"),
    () => Console.WriteLine("Found Completed")))
{
    browser.Search();
    Console.ReadLine();
}

Оператор Materialize() превращает IObservable<T> в и IObservable<Notification<T>>, что позволяет стандартным OnError и OnCompleted передаваться через вызов OnNext.Вы можете использовать Notification.CreateOnError<T>(new Exception("Error")) для создания элементов наблюдаемой, которую вы можете превратить в IObservable<T> с помощью Dematerialize().

Я бросил Synchronize(), чтобы убедиться, что вы создали действительную наблюдаемую,Использование Materialize() позволяет вам создавать наблюдаемые, которые не следуют обычному наблюдаемому контракту.Часть того, что делает Synchronize(), - это только обеспечение только одного OnError и только одного OnCompleted и отбрасывание любого OnNext, который идет после любого из двух.


Попробуйте это как способсделайте то, что вы хотели в комментариях:

static void Main(string[] args)
{
    var browser = new ServiceBrowser();

    var observableFound =
        Observable.FromEventPattern<ServiceFoundEventArgs>(
            h => browser.ServiceFound += h,
            h => browser.ServiceFound -= h)
        .Select(e => e.EventArgs.Service);

    var observableError =
        Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
            h => browser.ServiceError += h,
            h => browser.ServiceError -= h);

    var observableFoundWithError = observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x => Notification.CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

    Func<Service, IObservable<Service>> resolveService = s =>
        Observable.Create<Service>(o =>
        {
            var observableResolved = Observable.FromEventPattern<EventArgs>(
                h => s.AddressResolved += h,
                h => s.AddressResolved -= h);

            var observableResolveError = Observable.FromEventPattern<EventArgs>(
                h => s.ErrorResolvingAddress += h,
                h => s.ErrorResolvingAddress -= h);

            var observableResolvedWithError =
                observableResolved
                    .Select(x => s)
                    .Materialize()
                    .Merge(
                        observableResolveError
                        .Do(e => Console.WriteLine($"Error resolving: {s.Name}"))
                        .Materialize()
                        .Select(x => Notification.CreateOnError<Service>(new Exception($"Error resolving address for service: {s.Name}"))))
                    .Dematerialize()
                    .Synchronize();

            s.ResolveAddress();

            return observableResolvedWithError.Subscribe(o);
        });

    using (
        observableFoundWithError
            .Select(s => resolveService(s))
            .Switch()
            .Subscribe(
                s => Console.WriteLine($"Found and resolved service: {s.Name} ({s.Address})"),
                ex => Console.WriteLine($"Found error: {ex.Message}"),
                () => Console.WriteLine("Found Completed")))
    {
        browser.Search();
        Console.ReadLine();
    }
}

public class ServiceBrowser
{
    public event EventHandler<ServiceFoundEventArgs> ServiceFound;
    public event EventHandler<ServiceSearchErrorEventArgs> ServiceError;

    public void Search() { }
}

public class Service
{
    public event EventHandler<EventArgs> AddressResolved;
    public event EventHandler<EventArgs> ErrorResolvingAddress;

    public string Name;
    public string Address;

    public void ResolveAddress() { }
}

public class ServiceFoundEventArgs : EventArgs
{
    public Service Service;
}

public class ServiceSearchErrorEventArgs : EventArgs
{

}

Мне может потребоваться немного доработать - возможно, Observable.Delay там.Дайте мне знать, если это работает.

...