Какой правильный способ `подписаться` /` назначить` на топику Kafka c, используя. NET `Confluent.Kafka`? - PullRequest
2 голосов
/ 17 января 2020
Task.Run(
                        () =>
                        {
                            try
                            {
                                var (topic, partitionOrNull, offsetOrNull) = target;

                                if (partitionOrNull == null && offsetOrNull == null) consumer.Subscribe(topic);
                                else
                                {
                                    var partition = partitionOrNull ?? 0;
                                    if (offsetOrNull != null) consumer.Assign(new TopicPartitionOffset(topic, partition, offsetOrNull.Value));
                                    else consumer.Assign(new TopicPartition(topic, partition));
                                } 

                                while (!cancellationToken.IsCancellationRequested)
                                {
                                    var consumeResult = consumer.Consume(cancellationToken);

                                    if (consumeResult.IsPartitionEOF) continue;

                                    observer.OnNext((consumeResult.Offset.Value, consumeResult.Key, consumeResult.Value));
                                }
                            }
                            catch (Exception exception)
                            {
                                observer.OnError(exception);
                            }
                        },
                        cancellationToken);

                    return Task.FromResult<IDisposable>(consumer);
                });

Таким образом, есть потребитель Kafka (просто крошечная оболочка для последних Confluent.Kafka), которая охватывает оба сценария ios: subscribe с ребайм-балансом c и assign в указанный раздел и / или смещение. Проблема в том, что когда я указываю только topic (partition == null && offset == null), потребитель бесконечно зависает без какого-либо прогресса. Я неправильно использую структуру или что происходит?

PS Точное указание всех трех параметров прекрасно работает.

...