У меня есть некоторый код, который использует Данные о событиях служебной шины , и я подозреваю, что мне нужно использовать свойство смещения, так как в настоящее время моя программа (или кажется) повторно запускает то же самоеEvent Hub data снова и снова.
Мой код выглядит следующим образом:
public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
}
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
Поскольку я получаю одни и те же сообщения снова и снова, я подозреваю, что мне нужно сделать что-то подобное:
string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
Тем не менее, Offset
является строкой, даже если она кажется числовым значением (например, «12345»).Документация по context.CheckPointAsync()
показала, что это может быть ответом;однако выдача этого в конце цикла, кажется, не имеет значения.
Итак, у меня есть вопрос из двух частей:
- Что такое смещение?Это то, что я думаю (то есть числовой маркер для точки в потоке) и, если да, то почему это строка?
- Зачем мне снова получать одни и те же сообщения?Насколько я понимаю, концентраторы событий хотя и гарантируют хотя бы один раз, после того, как возникла проблема с контрольной точкой, мне не следует возвращать те же сообщения.
РЕДАКТИРОВАТЬ:
Послепока я возился, я придумал что-то, что позволяет избежать этой проблемы;тем не менее, я, конечно, не стал бы утверждать, что это решение:
var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);
Использование EventProcessorHost
, казалось, фактически усугубило проблему;то есть не только исторические события воспроизводились, но они, казалось, воспроизводились в случайном порядке.
РЕДАКТИРОВАТЬ:
Я наткнулся на эту отличную статью @Михаил, который, кажется, решает мою проблему.Тем не мение;и, вероятно, корень моей проблемы (или одной из них, если она верна, тогда я не уверен, почему использование EventProcessorHost
не работает из коробки, как сам @Mikhail сказал в комментариях).Однако версия ServiceBus ICheckpointManager
имеет только один метод интерфейса:
namespace Microsoft.ServiceBus.Messaging
{
public interface ICheckpointManager
{
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
}
}