04 октября 2019

Я пытаюсь написать клиент на C # (.Net Core), чтобы обернуть связь с кроликом. У меня есть случай, когда мне нужно предварительно сформировать вызов RPC, который ограничен IO. Я хочу иметь возможность обрабатывать несколько сообщений одновременно, и я не могу заставить его работать. Похоже, что независимо от того, что я пытаюсь, я не могу обработать другое сообщение, пока я не подтвердил предыдущее, и я не могу подтвердить предыдущую обработку, потому чтоЭто вызов RPC, и я должен ответить сообщением.

В моем примере с упрощенным кодом я использую .net core 2.2 и RabbitMQ.Client 5.10.

Интерфейс службы:

public interface IRabbitMQService : IDisposable
    Task SetupCommunication();
    Task<string> SendMessage(string queueName, string request);
    void Disconnect();


internal class RabbitMQService : IRabbitMQService
    private readonly IConnectionFactory _connectionFactory;
    private readonly IServiceProvider _serviceProvider;
    private IConnection _connection;
    private bool _disposed;
    private List<IModel> activeChannels;
    private readonly Dictionary<string, Func<IServiceScope, string, Task<string>>> _queueActionMapper;
    private readonly MessagingQueueConfig _config;
    public RabbitMQService(IConnectionFactory connectionFactory, IServiceProvider serviceProvider,
        Dictionary<string, Func<IServiceScope, string, Task<string>>> queueActionMapper, MessagingQueueConfig config)
        _connectionFactory = connectionFactory;
        _serviceProvider = serviceProvider;
        _queueActionMapper = queueActionMapper;
        _disposed = false;
        activeChannels = new List<IModel>();
        _config = config;

    public async Task SetupCommunication()
        if (_queueActionMapper != null && _queueActionMapper.Any())
            if (await EnsureConnection())
                foreach (var queue in _queueActionMapper.Keys)
                    activeChannels.Add(await SetupChannel(queue));

    public async Task<string> SendMessage(string queueName, string request)
        string response = null;
        if (await EnsureConnection())
                using (var channel = _connection.CreateModel())
                using (var signal = new ManualResetEvent(false))
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var body = Encoding.UTF8.GetBytes(request);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    props.DeliveryMode = 2;
                    var consumer = new EventingBasicConsumer(channel);      
                    string replyQueue = $"{queueName}_feedback";
                    channel.QueueDeclare(queue: replyQueue, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    props.ReplyTo = replyQueue;
                    props.Expiration = $"{(Math.Max(1, _config.ResponseTimeoutInSeconds) * 1000)}";
                    var correlationId = Guid.NewGuid().ToString();
                    props.CorrelationId = correlationId;
                    consumer.Received += async (model, ea) =>
                        if (ea.BasicProperties.CorrelationId == correlationId)
                            var responseBody = ea.Body;
                            var responseString = Encoding.UTF8.GetString(responseBody);
                            // The response
                            response = responseString;                          channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            catch (Exception ex)
                                // May throw disposed object exception
                        consumer: consumer,
                        queue: replyQueue,
                        autoAck: false);
                        exchange: "",
                        routingKey: queueName,
                        basicProperties: props,
                        body: body);                    
                    var timeout = await Task.Run(() => {
                        return !signal.WaitOne(TimeSpan.FromSeconds(Math.Max(1, _config.ResponseTimeoutInSeconds)));
                    });                 channel.BasicCancel(consumer.ConsumerTag);
                    if (timeout)
                        // timeout reached
                        response = "ERROR";
                    return response;
            catch (Exception ex)
                response = "ERROR";
            response = "CONNECTION ERROR";

        return response;

    public void Disconnect()

    public void Dispose()
        if (_disposed)
        _disposed = true;
            activeChannels.ForEach(channel => channel.Dispose());
        catch (Exception ex)


    #region Private helpers
    private async Task<IModel> SetupChannel(string queueName)
        IModel channel = null;
        if (await EnsureConnection())
                new Thread(() =>
                    Thread.CurrentThread.IsBackground = true;
                    channel = _connection.CreateModel();
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += async (mode, e) =>
                        await HandleReceived(mode, e, channel, queueName);

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    channel.CallbackException += async (sender, ea) =>
                        channel = await SetupChannel(queueName);
            catch (Exception ex)


        return channel;

    private async Task HandleReceived(object sender, BasicDeliverEventArgs e, IModel channel, string queueName)
            if (e.RoutingKey == queueName)
                var message = Encoding.UTF8.GetString(e.Body);
                using (var scope = _serviceProvider.CreateScope())
                    await HandleReceivedScoped(e, channel, queueName, message, scope);
        catch (Exception ex)


    private async Task HandleReceivedScoped(BasicDeliverEventArgs e, IModel channel, string queueName, string message, IServiceScope scope)
        channel.BasicAck(deliveryTag: e.DeliveryTag,
            multiple: false);
        // Execute the method
        var response = await _queueActionMapper[queueName](scope, message);
        var props = e.BasicProperties;
        var replyProps = channel.CreateBasicProperties();
        replyProps.CorrelationId = props.CorrelationId;
        replyProps.Expiration = $"{(Math.Max(1, _config.ResponseTimeoutInSeconds) * 1000)}";
        string replyTo = props.ReplyTo;
        await HandleCallback(channel, response, e.DeliveryTag, replyTo, replyProps);

    private async Task HandleCallback(IModel channel, string response, ulong deliveryTag, string queueName, IBasicProperties properties)
        if (await EnsureConnection())
                channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                string message = response ?? "No response had been sent";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: true, basicProperties: properties, body: body);
                channel.BasicAck(deliveryTag: deliveryTag,
                                multiple: false);

            catch (Exception ex)


    private bool Connected => _connection != null && _connection.IsOpen;
    // Make sure the connection is alive. Create a new one if not
    private async Task<bool> EnsureConnection()
            if (!Connected)
                if (_connection != null)
                    // Dispose of dead hanging connection
                    _connection = _connectionFactory.CreateConnection();
                catch (BrokerUnreachableException brockerUnreachedException)
                    await Task.Delay(2000);

                // Second attempt
                if (!Connected)
                    // If exception is thrown again, bubble it
                    _connection = _connectionFactory.CreateConnection();
                if (Connected)
                    // Connection error handlers
                    _connection.ConnectionShutdown += async (object sender, ShutdownEventArgs reason) => await OnConnectionShutdown(sender, reason);
                    _connection.CallbackException += async (object sender, CallbackExceptionEventArgs ex) => await OnCallbackException(sender, ex);
                    _connection.ConnectionBlocked += async (object sender, ConnectionBlockedEventArgs ex) => await OnConnectionBlocked(sender, ex);

        catch (Exception ex)


        return Connected;

    private async Task OnConnectionBlocked(object sender, ConnectionBlockedEventArgs ex)
        await HandleConnectionProblem("A RabbitMQ connection is shutdown. Trying to re-connect...", nameof(OnConnectionShutdown));

    private async Task OnCallbackException(object sender, CallbackExceptionEventArgs ex)
        await HandleConnectionProblem("A RabbitMQ connection throw exception. Trying to re-connect...", nameof(OnCallbackException));

    private async Task OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        await HandleConnectionProblem("A RabbitMQ connection is on shutdown. Trying to re-connect...", nameof(OnConnectionShutdown));

    private async Task HandleConnectionProblem(string message, string functionName)
        if (_disposed)
        await EnsureConnection();

В классе запуска:

public void ConfigureServices(IServiceCollection services)
{   services.Configure<MessagingQueueConfig>(Configuration.GetSection("MessagingQueueConfig"));
    var config = Configuration.GetSection("MessagingQueueConfig").Get<MessagingQueueConfig>();
    Dictionary<string, Func<IServiceScope, string, Task<string>>> queueActionMapper =
        new Dictionary<string, Func<IServiceScope, string, Task<string>>>();
    // TEST
        async (scope, request) =>
            var testService = scope.ServiceProvider.GetRequiredService<ITestRabbit>();
            var result = await testService.DoSomeIOBoundedOperationThatTakesAbout3Seconds(request);
            return result;
    services.AddSingleton<IRabbitMQService>(serviceProvider =>
        var factory = new ConnectionFactory()
            HostName = config.EventBusConnection,
            UserName = config.EventBusUserName,
            Password = config.EventBusPassword
        return new RabbitMQService(factory, serviceProvider, queueActionMapper, config);

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    var listener = app.ApplicationServices.GetService<IRabbitMQService>();
    var life = app.ApplicationServices.GetService<IApplicationLifetime>();
    life.ApplicationStarted.Register(async () =>
        await listener.SetupCommunication();
    life.ApplicationStopping.Register(() =>

Может кто-нибудь мне помочь? Как сделать так, чтобы сервис обрабатывал майские запросы? Я попытался установить параметр prefetch или даже настроить несколько каналов в разных потоках, но, похоже, ничего не работает

