Я недавно начал работать над потоковым приложением Kafka с использованием ядра .NET. Я следовал за учебником: https://medium.com/@srigumm/building-realtime-streaming-applications-using-net-core-and-kafka-ad45ed081b31.
Я создал базовое приложение «производитель-потребитель», в котором производитель берет входные данные и помещает их в кафку-тему. Потребитель может подписаться на тему и получать данные из нее. Я также могу перенести эти данные в другую тему с помощью нового производителя. Но я не могу инициализировать несколько потребителей из одной и той же темы.
В appsettings.json:
"consumer": {
"bootstrapservers": "localhost:9092", //specify your kafka broker address
"groupid": "csharp-consumer",
"enableautocommit": true,
"statisticsintervalms": 5000,
"sessiontimeoutms": 6000,
"autooffsetreset": 0,
"enablepartitioneof": true,
"SaslMechanism": 0, //0 for GSSAPI
//"SaslKerberosKeytab":"filename.keytab", //specify your keytab file here
"SaslKerberosPrincipal": "youralias@DOMAIN.COM", //specify your alias here
"SaslKerberosServiceName": "kafka"
//"SaslKerberosKinitCmd":"kinit -k -t %{sasl.kerberos.keytab} %{sasl.kerberos.principal}"
},
processOrderServices.cs:
namespace Api.Services
{
public class ProcessOrdersService : BackgroundService
{
private readonly ConsumerConfig consumerConfig;
private readonly ProducerConfig producerConfig;
//----------------------
//private readonly ConsumerConfig consumerConfig2;
public ProcessOrdersService(ConsumerConfig consumerConfig, ProducerConfig producerConfig)
{
this.producerConfig = producerConfig;
this.consumerConfig = consumerConfig;
//----------------------
//this.consumerConfig2 = consumerConfig;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("OrderProcessing Service Started\n\n");
while (!stoppingToken.IsCancellationRequested)
{
var consumerHelper = new ConsumerWrapper(consumerConfig, "orderrequests");
//var consumerHelper2 = new ConsumerWrapper(consumerConfig, "orderrequests");
string orderRequest = consumerHelper.readMessage();
//consumerHelper2.DisplayMessage();
//Deserilaize
OrderRequest order = JsonConvert.DeserializeObject<OrderRequest>(orderRequest);
//TODO:: Process Order
Console.WriteLine($"Info: OrderHandler => Processing the order for {order.productname}\n\n");
order.status = OrderStatus.COMPLETED;
//Write to ReadyToShip Queue
var producerWrapper = new ProducerWrapper(producerConfig,"readytoship");
await producerWrapper.writeMessage(JsonConvert.SerializeObject(order));
//--------------------------
// var consumerHelper2 = new ConsumerWrapper(consumerConfig2, "orderrequests");
//string processedOrder = consumerHelper2.readMessage();
//OrderRequest order2 = JsonConvert.DeserializeObject<OrderRequest>(processedOrder);
//Console.WriteLine($"Info: OrderHandler => Delivered the order for {order2.productname}\n\n");
//order2.status = OrderStatus.DELIVERED;
//----------------------------
}
}
}
}
ConsumerWrapper.cs:
namespace Api
{
public class ConsumerWrapper
{
private string _topicName;
private ConsumerConfig _consumerConfig;
private Consumer<string,string> _consumer;
private static readonly Random rand = new Random();
public ConsumerWrapper(ConsumerConfig config,string topicName)
{
this._topicName = topicName;
this._consumerConfig = config;
this._consumer = new Consumer<string,string>(this._consumerConfig);
this._consumer.Subscribe(topicName);
}
public string readMessage(){
var consumeResult = this._consumer.Consume();
return consumeResult.Value;
}
public void DisplayMessage()
{
var consumeResult = this._consumer.Consume();
Console.WriteLine(consumeResult.Value);
Console.WriteLine($"Info: OrderHandler => Delivered the order for {consumeResult.Value}\n\n");
return;
}
}
}
Я хочу иметь возможность вызывать класс Consumer несколько раз и иметь возможность читать из одной и той же темы. Я понял, что существует необходимость создать несколько разделов / идентификаторов групп, чтобы это можно было сделать. Но я не могу понять, где и как это сделать.