Я пытаюсь использовать Rx в моем клиенте Kafka.
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
, тогда у меня есть следующий код
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
, тогда где-то я использую его как
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
Есть ли возможность запустить отдельный поток по имени topi c (consumeResult.Topic
)? Когда потребитель получает сообщение, он должен перенаправить его в соответствующий поток по topi c