Невозможно отправить в Kafka topi c (Confluent Cloud) из Azure Функция - PullRequest
0 голосов
/ 05 мая 2020

Моя проблема:

Код не проходит через вызов ProduceAsyn c () и не генерирует никаких исключений. На топи c в Кафке сообщение не появляется. Функция не дает сбоев, она просто никогда не завершается, что означает, что она никогда не регистрирует статус, откуда портал Azure извлекает данные, которые он отображает в разделе «Мониторинг». Это не вызывает никаких исключений из-за превышения максимального количества времени, в течение которого функция может работать. Однако функция выполняется, поскольку ApplicationInsights показывает мои операторы журнала отладки в своих сообщениях трассировки. ):

[FunctionName("MyFunction")]
public static async Task RunAsync(
    [TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
    ILogger log,
    ExecutionContext context,
    [HttpClientFactory]HttpClient httpClient
)
{
    try {
        // ...
        using (var kafkaProducer = initKafkaProducer(config))
        {
            var myHelper = new MyHelper(/*...*/, kafkaProducer);
            foreach (var obj in objects)
            {
                await myHelper.ProcessObject(obj);
            }
        }
    }
    catch(Exception ex) 
    {
        //...
        throw new Exception("My error message", ex);
    }
}

private static IProducer<Null, string> initKafkaProducer(IConfigurationRoot config)
{
    var pConfig = new ProducerConfig
    {
        BootstrapServers = config["BootstrapServers"],
        SaslMechanism = SaslMechanism.Plain,
        SecurityProtocol = SecurityProtocol.SaslSsl,
        SaslUsername = config["Username"],
        SaslPassword = config["Password"],
        MessageSendMaxRetries = 10,
        RetryBackoffMs = 250,
        Acks = Acks.All,
        LingerMs = 5
    };

    return new ProducerBuilder<Null, string>(pConfig).Build();
}


public class MyHelper
{
    public async Task ProcessObject(MyObject obj)
    {
        try
        {
            //...
            await sendToKafkaTopic(obj);
            _log.LogInformation($"DEBUG: Successfully sent to Kafka");
            //...
        }
        catch (HttpRequestException ex)
        {
            throw new Exception("My error message...", ex);
        }
    }

    private async Task sendToKafkaTopic(MyObject obj)
    {
        string topic = _config["KafkaTopic"];
        var message = new Message<Null, string> { Value = JsonConvert.SerializeObject(obj) };

        try
        {
            var deliveryResult = await _kafkaProducer.ProduceAsync(topic, message);
            _log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
        }
        catch (ProduceException<Null, string> e)
        {
            var error = e.Error;

            if (error.IsError && error.IsFatal)
            {
                string errorMessage = "FATAL Kafka error! ";
                if (error.IsBrokerError)
                {
                    errorMessage += "BrokerError. ";
                }
                else if (error.IsLocalError)
                {
                    errorMessage += "LocalError. ";
                }

                errorMessage += $"Error code: {error.Code}. Reason: {error.Reason}";
                throw new Exception(errorMessage);
            }
        }
    }
}

Другой подход, который я пробовал, - это использование вызова Produce (), отправка с ним обработчика для DeliveryReport и использование вызова _kafkaProducer.Flush(TimeSpan.FromSeconds(10)). Это выглядело так, как будто все прошло хорошо, но сообщения не отображались в Kafka. при случайном количестве вызовов функций он перестает работать, и мне нужно повторно развернуть приложение, чтобы оно снова заработало (может быть совпадением, что его перезапуск не сработал, а повторное развертывание сработало - но так было несколько раз).

Это приложение с функцией V2. NET Core 2.1. Я использую пакет NuGet Confluent.Kafka версии 1.4.0. Он работает по плану потребления, и я публикую его из Visual Studio.

1 Ответ

0 голосов
/ 15 мая 2020

Кажется, есть много вещей, которые могут вызывать эти симптомы. Как я упоминал в своем комментарии к OP, обновление Confluent.Kafka с 1.4.0 до 1.4.2 решило мою первоначальную проблему. Однако через несколько дней появились те же самые симптомы, и отправка на другой топи c не помогла.

Чтобы узнать, что происходит, я включил весь отладочный вывод от клиента, установив свойство конфигурации клиента Debug = "all". Это привело к следующему выводу из экземпляра работающего приложения-функции:

Level: Debug
Instance Name: rdkafka#producer-1
Facility: CERTROOT
Message: [thrd:app]: 38/38 certificate(s) successfully added from Windows Certificate Root store

И следующее из экземпляра приложения-функции, которое не сработало:

Level: Debug
Instance Name: rdkafka#producer-25
Facility: CERTROOT
Message: [thrd:app]: Failed to open Windows certificate Root store: Access is denied...: falling back to OpenSSL default CA paths

Level: Debug
Instance Name: rdkafka#producer-25
Facility: BROKERFAIL
Message: [thrd:sasl_ssl://redacted/boot]: sasl_ssl://redacted/bootstrap: failed: err: Local: SSL error: (errno: No such file or directory)

Нашел обходной путь из средства отслеживания проблем клиента , но изначально он не работал. Пришлось указать полный путь к файлу cacert.pem (D:\\home\\site\\wwwroot\\cacert.pem), а также убедиться, что приложение НЕ развертывается как пакет. Кажется, что на этот раз главным виновником была последняя часть, и команда Azure над этим занимается. Надеюсь, они решат эту проблему, и нам не придется использовать и управлять нашим собственным файлом сертификата ЦС.

По-видимому, довольно часто приходится указывать путь к root ЦС при использовании бессерверных архитектур , но в случае Azure функций это не должно быть (но может быть для некоторых).

...