Перенаправить сообщение в соответствующий рабочий поток / задачу - PullRequest
1 голос
/ 03 февраля 2020

Я работаю с Kafka и пытаюсь внедрить единого потребителя, который подписывается на все нужные темы. Допустим, у меня есть 3 темы (A, B, C). Как обрабатывать сообщения от каждой топи c синхронно, но между темами параллельно. Итак, для A topi c мне нужно обрабатывать сообщения одно за другим, но в то же время мне нужно обрабатывать сообщения одно за другим из других тем.

Похоже, мне нужна отдельная тема для каждой топи c. Подскажите, пожалуйста, как это можно реализовать? Есть ли готовые решения для этого? Мой потребитель выглядит как

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);

    ... processing here

    _consumer.Commit(consumeResult);
}

Я понятия не имею, как его реализовать, потому что пока нет опыта работы с asny c в C#. Найдена такая вещь как Реактивные Расширения https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2.

Когда я получаю сообщение, как перенаправить сообщение в соответствующий рабочий поток / задачу?

Использование пакета Confluent.Kafka 1.3.0 для работы с Kafka

Ответы [ 3 ]

1 голос
/ 03 февраля 2020

Посмотрите на TPL DataFlow , а именно ActionBlock или Каналы .

1 голос
/ 04 февраля 2020
var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
    channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
    {
        ... processing here
        _consumer.Commit(consumeResult);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}

этот код создаст ActionBlock для всех тем, на которые вы подписаны.

затем, когда вы получите сообщение, перенаправьте его на соответствующий канал

while (!cancellationToken.IsCancellationRequested)
{
   ConsumeResult<string, string> consumeResult = _consumer.Consume();
   await channels[consumeResult.Topic].SendAsync(consumeResult);
}

Прочитайте статьи из @PauloMorgado ответ

0 голосов
/ 03 февраля 2020

Вы должны создать 1 заметный объект для каждой топи c, на которой будут публиковаться sh сообщения. Затем подпишитесь на каждую доступную топику c на обозревателе.

Создать список IObservableList из списка IObservable

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...