Я пишу консольное приложение в .net Core 2.1, и я намерен прослушивать сообщения в теме в ServiceBus и обрабатывать новые сообщения, поступающие в Elasticsearch с использованием API NEST (NEST, вероятно, не имеет значения для моего вопроса, но хотелбыть прозрачным).
Моя сущность Topic в ServiceBus называется «test», и у меня есть подписка, также называемая «test» (полный путь будет «test / subscription / test»).
В моем консольном приложении .net Core у меня есть следующие ссылки на NuGet:
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
У меня очень странная проблема при использовании .net Standard ServiceBus Api, когда я регулярно получаю ошибку возобновления блокировки:
Обработчик сообщений обнаружил исключение Microsoft.Azure.ServiceBus.MessageLockLostException
Я переместил свой код обратно в очень воспроизводимый образец здесь:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;
namespace SampleApp
{
public class Program
{
private static SubscriptionClient _subscriptionClient;
private static IElasticClient _elasticClient;
private static string ServiceBusConnectionString = "[connectionString]";
private static string TopicName = "test";
private static string SubscriptionName = "test";
public static void Main(string[] args)
{
var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
_elasticClient = new ElasticClient(elasticsearchSettings);
_subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};
// Register the function that processes messages.
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
Console.WriteLine("INFO: Process message handler registered, listening for messages");
Console.Read();
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Message received.
var content = Encoding.UTF8.GetString(message.Body);
var messageBody = JsonConvert.DeserializeObject<string[]>(content);
Console.WriteLine($"INFO: Message arrived: {message}");
Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
try
{
var response = _elasticClient.Ping();
if (!response.IsValid && response.OriginalException != null)
Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
else
Console.WriteLine("INFO: ElasticSearch was contacted successfully");
}
catch (Exception e)
{
Console.WriteLine("!ERROR!: " + e);
}
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine("INFO: Message completed");
}
// Use this handler to examine the exceptions received on the message pump.
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
return Task.CompletedTask;
}
}
Этот код почти идентичен примеру, взятому здесь: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions
Я намеренно "пингую" экземпляр ElasticsearchОн не существует, чтобы создать исключение сокета, которое помогает мне воспроизвести проблему.
Одна вещь, которую я заметил, это то, что когда я создаю новую тему и имею EnabledPartioning = false ,проблема не возникает.
Кто-нибудь видел это раньше?Кажется, проблема глубоко в самом коде ServiceBus.
Примечание. Я пытался использовать Receiver для чтения сообщений, используя «ReceiveAsync», и я также получаю эту ошибку в этом сценарии.Кроме того, мой драйвер для тестирования - это перемещение клиента .NET Framework ServiceBus (который работает с разделами) и на версию .net Core.
Заранее спасибо за любые указатели !!