Как использовать значение смещения ServiceBus EventData - PullRequest
0 голосов
/ 05 июня 2018

У меня есть некоторый код, который использует Данные о событиях служебной шины , и я подозреваю, что мне нужно использовать свойство смещения, так как в настоящее время моя программа (или кажется) повторно запускает то же самоеEvent Hub data снова и снова.

Мой код выглядит следующим образом:

public class EventHubListener : IEventProcessor
{
    private static EventHubClient _eventHubClient;        
    private const string EhConnectionStringNoPath = "Endpoint=...";
    private const string EhConnectionString = EhConnectionStringNoPath + ";...";
    private const string EhEntityPath = "...";        

    public void Start()
    {
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

        foreach (string partitionId in eventHub.PartitionIds)
        {
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            {
                PartitionId = partitionId
            }, new EventProcessorCheckpointManager());

            Console.WriteLine("Processing : " + partitionId);
        }
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {                
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            MyData data = JsonConvert.DeserializeObject<MyData>(bytes);

Поскольку я получаю одни и те же сообщения снова и снова, я подозреваю, что мне нужно сделать что-то подобное:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);

Тем не менее, Offset является строкой, даже если она кажется числовым значением (например, «12345»).Документация по context.CheckPointAsync() показала, что это может быть ответом;однако выдача этого в конце цикла, кажется, не имеет значения.

Итак, у меня есть вопрос из двух частей:

  1. Что такое смещение?Это то, что я думаю (то есть числовой маркер для точки в потоке) и, если да, то почему это строка?
  2. Зачем мне снова получать одни и те же сообщения?Насколько я понимаю, концентраторы событий хотя и гарантируют хотя бы один раз, после того, как возникла проблема с контрольной точкой, мне не следует возвращать те же сообщения.

РЕДАКТИРОВАТЬ:

Послепока я возился, я придумал что-то, что позволяет избежать этой проблемы;тем не менее, я, конечно, не стал бы утверждать, что это решение:

var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);

Использование EventProcessorHost, казалось, фактически усугубило проблему;то есть не только исторические события воспроизводились, но они, казалось, воспроизводились в случайном порядке.

РЕДАКТИРОВАТЬ:

Я наткнулся на эту отличную статью @Михаил, который, кажется, решает мою проблему.Тем не мение;и, вероятно, корень моей проблемы (или одной из них, если она верна, тогда я не уверен, почему использование EventProcessorHost не работает из коробки, как сам @Mikhail сказал в комментариях).Однако версия ServiceBus ICheckpointManager имеет только один метод интерфейса:

namespace Microsoft.ServiceBus.Messaging
{

    public interface ICheckpointManager
    {
        Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
    }
}

1 Ответ

0 голосов
/ 11 октября 2018

Ваш заголовок должен быть центром событий, а не служебной шиной.На ваш вопрос:

  1. Хотя концентратор событий похож на Kafka, но одно большое отличие состоит в том, что вы должны сами управлять смещениями.Event Hub Broker не имеет ни малейшего представления о смещении вашей группы потребителей.
  2. Таким образом, Event Hub SDK предоставляет некоторый класс справки для хранения смещения в учетной записи хранения, но вам все равно необходимо вызвать контрольную точку вручную после обработки сообщения.
...