Выполнить обработку в отдельном потоке по группам - PullRequest
0 голосов
/ 03 февраля 2020

Я пытаюсь использовать Rx в моем клиенте Kafka.

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;

, тогда у меня есть следующий код

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}

, тогда где-то я использую его как

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});

Есть ли возможность запустить отдельный поток по имени topi c (consumeResult.Topic)? Когда потребитель получает сообщение, он должен перенаправить его в соответствующий поток по topi c

enter image description here

1 Ответ

5 голосов
/ 04 февраля 2020

Дайте этому go:

Observable
    .Interval(TimeSpan.FromSeconds(0.1))
    .Take(20)
    .GroupBy(x => x % 3)
    .SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
    .Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));

Это обеспечивает создание потока в планировщике new EventLoopScheduler() для каждой внутренней наблюдаемой, созданной оператором GroupBy. SelectMany выравнивает группу, но сохраняет для каждой группы EventLoopScheduler.

В вашем случае вы GroupBy свойство consumeResult.Topic.

Убедитесь, что ваше Наблюдаемый источник заканчивается, поскольку потоки живут вечно, пока они не делают. Вызов Dispose() для подписки достаточен для прекращения наблюдаемого.

...