Я пытаюсь создать API-интерфейс производителя, который должен просто захватить тело запроса и отправить его в kafka topi c в кластере MSK. (проблема почти такая же для потребительской части)
Я создал кластер MSK в VP C с 3 выделенными подсетями. Бессерверное приложение работает с тем же VP C, но другим su bnet, добавляющим VpcConfig в файл 'serverless.template'. Я также прилагаю к этому файлу политики «AWSLambdaFullAccess» и «AmazonMSKFullAccess». Группы безопасности с обеих сторон разрешают все трафик c во всех портах.
Я создаю экземпляр EC2 в том же su bnet, и я могу успешно подключиться с помощью команды 'bin / kafka-console -producer. sh 'и создавать сообщения.
Ядро do tnet использует пакет "Confluent.Kafka" версии 1.3.0.
Код выглядит так:
var config = new ProducerConfig { BootstrapServers = this.BootstrapServers };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
this.Logger.LogInformation($"Producer name: {p.Name}");
var dr = await p.ProduceAsync(this.TopicName, new Message<Null, string> { Value = message });
this.Logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
var msg = $"Delivery failed: {e.Error.Reason}";
Console.WriteLine(msg);
this.Logger.LogCritical(msg);
throw new Exception(msg);
}
}
Я пытаюсь установить серверы bootstrap с DNS, предоставленным непосредственно из MSK, а также добавляю префикс "plaintext: //"
Любая идея, почему я получаю TimeOut в строка «ProduceAsyn c»?
Я полагаю, это связано с тем, что библиотека Confluence.Kafka не совместима с MSK, но на самом деле я не могу определить, правда это или нет.
Заранее спасибо.
С уважением,