Как я могу правильно обработать повторное подключение подписок, когда хранилище событий умирает - PullRequest
0 голосов
/ 25 сентября 2018

Хранилище событий кажется несовместимым при повторном подключении догоняющих подписок.Исходя из потоков в группе Google и экспериментов, кажется, что догоняющие подписки будут переподключаться автоматически, если SubscriptionDropReason равен ConnectionClosed.

Однако если впоследствии вы попытаетесь остановить эти соединения и указать время ожидания, то они, кажется, никогда не прекратятся и всегда останутся.

Я использовал код из этого поста но я полагаю, что это вводит в заблуждение, так как в моих экспериментах причина получения нескольких событий в этом случае заключается в том, что хранилище событий автоматически повторно подписывается на CatchupSubscription, когда хранилище событий возвращается в оперативный режим, так что если вы не подписываетесь повторно, когда причиной является ConnectionClosed, то выне будет получать события несколько раз.Если я изменяю этот код, чтобы добавить время ожидания к методу .Stop(timeout), он просто зависает и выбрасывает.

Когда я получаю вызов запущенного события живой обработки после автоматического переподключения, объект EventStoreCatchUpSubscription имеет внутреннее состояниечто указывает на то, что _isDropped=1, что также кажется мне странным, поскольку он не отброшен, он фактически обрабатывает события.

Поэтому мой вопрос заключается в том, как мне обработать случай, когда хранилище событий переподключилось автоматически, и я хочу отброситьсоединение и ждать тайм-аут?

1 Ответ

0 голосов
/ 12 ноября 2018

Событие «subscriptionDropped» является хорошим местом для решения проблем с подключением.В проекте хранилища событий все еще существует открытая проблема.

https://github.com/EventStore/EventStore/issues/929

https://github.com/EventStore/EventStore/issues/1127

eventStoreConnection.SubscribeToAllFrom(lastCheckpoint, catchUpSubscriptionSettings,
                eventAppeared(projection),
                liveProcessingStarted(projection),subscriptionDropped(projection),userCredentials );   



 private Action<EventStoreCatchUpSubscription, SubscriptionDropReason, Exception> subscriptionDropped(Projection projection)
            => async (eventStoreCatchUpSubscription, subscriptionDropReason, exception) =>
            {

                eventStoreCatchUpSubscription.Stop();

                switch (subscriptionDropReason)
                {
                    case SubscriptionDropReason.UserInitiated:
                        Console.WriteLine($"{projection} projection stopped by user.");
                        break;
                    case SubscriptionDropReason.SubscribingError:
                    case SubscriptionDropReason.ServerError:
                    case SubscriptionDropReason.ConnectionClosed:
                    case SubscriptionDropReason.CatchUpError:
                    case SubscriptionDropReason.ProcessingQueueOverflow:
                    case SubscriptionDropReason.EventHandlerException:
                        Console.WriteLine($"{projection} projection stopped because of a transient error ({subscriptionDropReason}). ");
                        Console.WriteLine($"Exception Detail:  {exception}");    
                        Console.WriteLine("Attempting to restart...");
                        // Re-build your subscription in here
                        Task.Run(() => StartProjection(projection));
                        break;
                    default:
                        Console.WriteLine("Your subscription gg");
                        Console.WriteLine($"Exception Detail:  {exception}");    
                        break;
                }
            };
...