Task.Run(
() =>
{
try
{
var (topic, partitionOrNull, offsetOrNull) = target;
if (partitionOrNull == null && offsetOrNull == null) consumer.Subscribe(topic);
else
{
var partition = partitionOrNull ?? 0;
if (offsetOrNull != null) consumer.Assign(new TopicPartitionOffset(topic, partition, offsetOrNull.Value));
else consumer.Assign(new TopicPartition(topic, partition));
}
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF) continue;
observer.OnNext((consumeResult.Offset.Value, consumeResult.Key, consumeResult.Value));
}
}
catch (Exception exception)
{
observer.OnError(exception);
}
},
cancellationToken);
return Task.FromResult<IDisposable>(consumer);
});
Таким образом, есть потребитель Kafka (просто крошечная оболочка для последних Confluent.Kafka
), которая охватывает оба сценария ios: subscribe
с ребайм-балансом c и assign
в указанный раздел и / или смещение. Проблема в том, что когда я указываю только topic
(partition == null && offset == null
), потребитель бесконечно зависает без какого-либо прогресса. Я неправильно использую структуру или что происходит?
PS Точное указание всех трех параметров прекрасно работает.