Обработка всех событий для совокупности - PullRequest
0 голосов
/ 21 сентября 2018

Пожалуйста, смотрите мою первую постоянную подписку ниже:

 namespace PersistentSubscription
    {
        internal class Program
        {
            private static void Main()
            {
                var subscription = new PersistentSubscriptionClient();
                subscription.Start();
            }
        }

        public class PersistentSubscriptionClient
        {
            private IEventStoreConnection _conn;
            private const string STREAM = "$ce-customer";
            private const string GROUP = "a_test_group";
            private const int DEFAULTPORT = 1113;
            private static readonly UserCredentials User = new UserCredentials("admin", "changeit");
            private EventStorePersistentSubscriptionBase _subscription;

            public void Start()
            {
                var settings = ConnectionSettings.Create(); 

                using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
                {
                    _conn.ConnectAsync().Wait();

                    CreateSubscription(); 
                    ConnectToSubscription();

                    Console.WriteLine("waiting for events. press enter to exit");
                    Console.ReadLine();
                }
            }

            private void ConnectToSubscription()
            {
                var bufferSize = 10;
                var autoAck = true;

                Action<EventStorePersistentSubscriptionBase, ResolvedEvent> eventAppeared = EventAppeared; 
                _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck);
            }

            private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                SubscriptionDropReason subscriptionDropReason, Exception ex)
            {
                ConnectToSubscription();
            }

            private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                ResolvedEvent resolvedEvent)
            {
                MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data);
                IFormatter formatter = new BinaryFormatter();
                stream.Seek(0, SeekOrigin.Begin);
                try
                {
                    CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); 
                    Console.WriteLine(customerCreated);
                }
                catch (Exception e)
                {
                    var test = "test";
                }

            }

            private void CreateSubscription()
            {
                PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                    .DoNotResolveLinkTos()
                    .StartFromCurrent();

                try
                {
                    _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait();
                }
                catch (AggregateException ex)
                {
                    if (ex.InnerException.GetType() != typeof(InvalidOperationException)
                        && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists")
                    {
                        throw;
                    }
                }
            }
        }
    }

и мой первый клиент ниже:

using System;
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using EventStore.ClientAPI;

namespace WritingEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            const int DEFAULTPORT = 1113;
            var settings = ConnectionSettings.Create();
            using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
            {
                conn.ConnectAsync().Wait();
                CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" };
                EventData customerCreated1 = GetEventDataFor(c1);
                conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait();
            }
        }

        private static EventData GetEventDataFor(CustomerCreated customerCreated)
        {
            IFormatter formatter = new BinaryFormatter();
            MemoryStream stream = new MemoryStream();
            formatter.Serialize(stream, customerCreated);
            byte[] customerCreatedEventByteArray = stream.ToArray();



            return new EventData(
                Guid.NewGuid(),
                "eventType",
                true,
                customerCreatedEventByteArray,
                null
                );
        }
    }

    [Serializable]
    public class CustomerCreated
    {
        public Guid Id { get; set; }
        public string Name { get; set; }
    }
}

Я запускаю сервер, а затем клиент.Я вижу ошибку при десериализации события CustomerCreated на стороне сервера.Ошибка: «Конец потока был обнаружен до завершения анализа».

Если я изменю эту строку:

private const string STREAM = "$ce-customer";

на эту:

private const string STREAM = "customer-100";

Тогда десериализация будет работать правильно на стороне сервера.

Как сделатьЯ работаю со всеми событиями для клиентов - не только для клиентов 100?

У меня есть --run-projections=all при запуске Event Store.Я также включил все прогнозы:

enter image description here

1 Ответ

0 голосов
/ 22 сентября 2018

Этот вопрос помог мне: Используя API-интерфейс клиента хранилища событий (.NET), как записать в поток и связать одно событие с другим?

Мне просто пришлось изменитьthis:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .DoNotResolveLinkTos() //Specifically this line
                .StartFromCurrent();

to this:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .ResolveLinkTos() //Specifically this line
                .StartFromCurrent();

DoNotResolveLinkTos получает ссылку на исходное событие, тогда как ResolveLinkTos получает само фактическое событие.Поэтому я пытался десериализовать объект ссылки, который вызывал исключение.

...