Запуск обновления кеша DynamicData с использованием Reactive Subject - PullRequest
0 голосов
/ 30 мая 2019

В качестве предупреждения я новичок в Rx (2 недели) и экспериментирую с использованием Rx, RxUI и DynamicData .

Роланда Фазана.

У меня есть служба, которая первоначально загружает данные из локального постоянного хранилища, а затем по какой-то пользовательской (или системной) инструкции свяжется с сервером (в данном случае TriggerServer), чтобы получить дополнительные или заменяющие данные. Решение, которое я придумала, использует тему, и я сталкивался со многими сайтами, обсуждающими преимущества и недостатки их использования. Хотя я понимаю основы горячего / холодного, все это основано на чтении, а не на реальном мире.

Итак, используя приведенную ниже в качестве упрощенной версии, это «правильный» способ решения этой проблемы или есть что-то, что я где-то не правильно понял?

Примечание: я не уверен, насколько это важно, но фактический код взят из приложения Xamarin.Forms, которое использует RxUI, пользовательский ввод - ReactiveCommand.

Пример:

using DynamicData;
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

public class MyService : IDisposable
{

    private CompositeDisposable _cleanup;
    private Subject<Unit> _serverSubject = new Subject<Unit>();

    public MyService()
    {

        var data = Initialise().Publish();
        AllData = data.AsObservableCache();


        _cleanup = new CompositeDisposable(AllData, data.Connect());
    }

    public IObservableCache<MyData, Guid> AllData { get; }

    public void TriggerServer()
    {
        // This is what I'm not sure about...
        _serverSubject.OnNext(Unit.Default);
    }

    private IObservable<IChangeSet<MyData, Guid>> Initialise()
    {
        return ObservableChangeSet.Create<MyData, Guid>(async cache =>
        {
            // inital load - is this okay?
            cache.AddOrUpdate(await LoadLocalData());


            // is this a valid way of doing this?
            var sync = _serverSubject.Select(_ => GetDataFromServer())
                .Subscribe(async task =>
                {
                    var data = await task.ConfigureAwait(false);
                    cache.AddOrUpdate(data);
                });

            return new CompositeDisposable(sync);
        }, d=> d.Id);
    }

    private IObservable<MyData> LoadLocalData()
    {
        return Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => new MyData("localdata"));
    }

    private async Task<MyData> GetDataFromServer()
    {
        await Task.Delay(2000).ConfigureAwait(true);
        return new MyData("serverdata");
    }

    public void Dispose()
    {
        _cleanup?.Dispose();
    }
}

public class MyData
{
    public MyData(string value)
    {
        Value = value;
    }

    public Guid Id { get; } = Guid.NewGuid();

    public string Value { get; set; }
}

И простое консольное приложение для запуска:

public static class TestProgram
{
    public static void Main()
    {
        var service = new MyService();

        service.AllData.Connect()
            .Bind(out var myData)
            .Subscribe(_=> Console.WriteLine("data in"), ()=> Console.WriteLine("COMPLETE"));

        while (Continue())
        {
            Console.WriteLine("");
            Console.WriteLine("");
            Console.WriteLine($"Triggering Server Call, current data is: {string.Join(", ", myData.Select(x=> x.Value))}");
            service.TriggerServer();
        }
    }

    private static bool Continue()
    {
        Console.WriteLine("Press any key to call server, x to exit");
        var key = Console.ReadKey();
        return key.Key != ConsoleKey.X;
    }
}

Ответы [ 2 ]

0 голосов
/ 30 мая 2019

Я ставлю то, к чему я пришел, в качестве своего решения на случай, если другие найдут это, пока они блуждают по интернетам.

Я закончил тем, что удалил все субъекты вместе и связал вместе несколько SourceCache, поэтому, когда один менял его, вставлял в другой и так далее. Я удалил код для краткости:

public class MyService : IDisposable
{
    private SourceCache<MyData, Guid> _localCache = new SourceCache<MyData, Guid>(x=> x.Id);
    private SourceCache<MyData, Guid> _serverCache = new SourceCache<MyData, Guid>(x=> x.Id);

    public MyService()
    {
        var localdata = _localCache.Connect();
        var serverdata = _serverCache.Connect();
        var alldata = localdata.Merge(serverdata);

        AllData = alldata.AsObservableCache();
    }

    public IObservableCache<MyData, Guid> AllData { get; }

    public IObservable<Unit> TriggerLocal()
    {
        return LoadLocalAsync().ToObservable();
    }

    public IObservable<Unit> TriggerServer()
    {
        return LoadServerAsync().ToObservable();
    }
}

РЕДАКТИРОВАТЬ: я снова изменил это, чтобы удалить любую цепочку кэшей - я просто внутренне управляю одним кэшем. Урок не размещать слишком рано.

0 голосов
/ 30 мая 2019

Выглядит очень хорошо для первой попытки с Rx

Я бы предложил несколько изменений:

1) Удалите вызов Initialize() из конструктора и сделайте его общедоступным методом - очень помогает в модульных тестах, и теперь вы можете await сделать это, если вам нужно

 public static void Main()
    {
        var service = new MyService();
        service.Initialize();

2) Добавить Throttle к вашему триггеру - это исправляет параллельные вызовы на сервер, возвращающие те же результаты

3) Не делайте ничего, что может добавить Subscribe, вместо этого используйте Do:

var sync = _serverSubject
                .Throttle(Timespan.FromSeconds(0.5), RxApp.TaskPoolScheduler) // you can pass a scheduler via arguments, or use TestScheduler in unit tests to make time pass faster
                .Do(async _ =>
                {
                    var data = await GetDataFromServer().ConfigureAwait(false); // I just think this is more readable, your way was also correct
                    cache.AddOrUpdate(data);
                })
               // .Retry(); // or anything alese to handle failures
                .Subscribe();
...