Моя проблема:
Код не проходит через вызов ProduceAsyn c () и не генерирует никаких исключений. На топи c в Кафке сообщение не появляется. Функция не дает сбоев, она просто никогда не завершается, что означает, что она никогда не регистрирует статус, откуда портал Azure извлекает данные, которые он отображает в разделе «Мониторинг». Это не вызывает никаких исключений из-за превышения максимального количества времени, в течение которого функция может работать. Однако функция выполняется, поскольку ApplicationInsights показывает мои операторы журнала отладки в своих сообщениях трассировки. ):
[FunctionName("MyFunction")]
public static async Task RunAsync(
[TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
ILogger log,
ExecutionContext context,
[HttpClientFactory]HttpClient httpClient
)
{
try {
// ...
using (var kafkaProducer = initKafkaProducer(config))
{
var myHelper = new MyHelper(/*...*/, kafkaProducer);
foreach (var obj in objects)
{
await myHelper.ProcessObject(obj);
}
}
}
catch(Exception ex)
{
//...
throw new Exception("My error message", ex);
}
}
private static IProducer<Null, string> initKafkaProducer(IConfigurationRoot config)
{
var pConfig = new ProducerConfig
{
BootstrapServers = config["BootstrapServers"],
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = config["Username"],
SaslPassword = config["Password"],
MessageSendMaxRetries = 10,
RetryBackoffMs = 250,
Acks = Acks.All,
LingerMs = 5
};
return new ProducerBuilder<Null, string>(pConfig).Build();
}
public class MyHelper
{
public async Task ProcessObject(MyObject obj)
{
try
{
//...
await sendToKafkaTopic(obj);
_log.LogInformation($"DEBUG: Successfully sent to Kafka");
//...
}
catch (HttpRequestException ex)
{
throw new Exception("My error message...", ex);
}
}
private async Task sendToKafkaTopic(MyObject obj)
{
string topic = _config["KafkaTopic"];
var message = new Message<Null, string> { Value = JsonConvert.SerializeObject(obj) };
try
{
var deliveryResult = await _kafkaProducer.ProduceAsync(topic, message);
_log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
}
catch (ProduceException<Null, string> e)
{
var error = e.Error;
if (error.IsError && error.IsFatal)
{
string errorMessage = "FATAL Kafka error! ";
if (error.IsBrokerError)
{
errorMessage += "BrokerError. ";
}
else if (error.IsLocalError)
{
errorMessage += "LocalError. ";
}
errorMessage += $"Error code: {error.Code}. Reason: {error.Reason}";
throw new Exception(errorMessage);
}
}
}
}
Другой подход, который я пробовал, - это использование вызова Produce (), отправка с ним обработчика для DeliveryReport и использование вызова _kafkaProducer.Flush(TimeSpan.FromSeconds(10))
. Это выглядело так, как будто все прошло хорошо, но сообщения не отображались в Kafka. при случайном количестве вызовов функций он перестает работать, и мне нужно повторно развернуть приложение, чтобы оно снова заработало (может быть совпадением, что его перезапуск не сработал, а повторное развертывание сработало - но так было несколько раз).
Это приложение с функцией V2. NET Core 2.1. Я использую пакет NuGet Confluent.Kafka версии 1.4.0. Он работает по плану потребления, и я публикую его из Visual Studio.