Я обертываю некоторые вызовы API, чтобы притвориться результатом сокета - зацикливаю вызов с интервалом в несколько секунд и выдаю результат при изменении.
Хорошо работает при использовании оператора DistinctUntilChanged()
.Однако, когда результатом является список, оператор всегда выдает результат, потому что он отличается от стандартного компаратора.
Это моя пользовательская заметка для повторения задачи с некоторой задержкой независимо от успеха или неудачи.
public class TaskRepeatObservable<T> : IObservable<T>
{
private readonly Func<Task<T>> _taskFactory;
private readonly TimeSpan _repeatDelayTimeSpan;
private readonly ILogger _logger;
private Func<Exception, bool> _onError;
public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
{
_logger = new Log4NetLogger(GetType());
_logger.IsEnabled = false;
_taskFactory = taskFactory;
_repeatDelayTimeSpan = repeatDelayTimeSpan;
_onError = onError;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var tokenSource = new CancellationTokenSource();
var cancellationToken = tokenSource.Token;
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await _taskFactory();
observer.OnNext(result);
}
catch (Exception e)
{
_logger.Error(e, "Observable Error: " + e.Message);
if (_onError != null && !_onError.Invoke(e))
throw;
}
finally
{
try
{
if (_repeatDelayTimeSpan > TimeSpan.Zero)
await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
}
catch (TaskCanceledException)
{
// ignored
}
}
}
}
catch (Exception e)
{
observer.OnError(e);
}
_logger.Debug("Observable is cancelled.");
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return Disposable.Create(() =>
{
tokenSource.Cancel();
});
}
}
Это расширение для упаковки вызовов API.
public static class ObservableBuilder
{
///<summary>
///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
///</summary>
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null)
{
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
Мой вопрос - как заставить DistinctUntilChanged()
работать на ЛЮБОЙ результат, включая List
или Enumerable
.
Заметил, что я попытался реализовать свой собственный компаратор.Но я все еще не знаю, как проверить тип T
, чтобы выбрать правильный компаратор для DistinctUntilChanged()
public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
{
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(IEnumerable<T> obj)
{
// Will not throw an OverflowException
unchecked
{
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
Вот простой код тестирования:
ObservableBuilder.Repeat(async () =>
{
var i = new List<int>() { 1, 2, 3, 4 };
return i;
}, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));
Я ожидаю, что результат будет выдан только один раз для списка.