Итак, здесь мы go: с учетом Confluent.Kafka
IConsumer<>
он упаковывает его в выделенный async
CE и потребляет до тех пор, пока отмена не была запрошена. Этот фрагмент кода также защищает себя от OperationCancelledException
и запускает блок finally
, чтобы обеспечить постепенное прекращение работы потребителя.
let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
async {
let! ct = Async.CancellationToken
try
try
while not ct.IsCancellationRequested do
let consumeResult = consumer.Consume(ct)
if not consumeResult.IsPartitionEOF then do! (callback consumeResult)
with
| :? OperationCanceledException -> return ()
finally
consumer.Close()
consumer.Dispose()
}
Вопрос № 1: является ли этот код правильным или я злоупотребляю async
?
Пока все хорошо. В моем приложении мне приходится иметь дело со многими потребителями, которые должны d ie в целом. Итак, предполагая, что consumers: seq<Async<unit>>
представляет их, я пришел к следующему коду:
async {
for consumer in consumers do
do! (Async.StartChild consumer |> Async.Ignore).
}
Я ожидаю, что этот код будет связывать дочерние элементы с контекстом отмены родительского элемента, и как только он будет отменен, дочерние элементы будут быть отмененным также.
Вопрос № 2: гарантированно ли будет запущен мой блок finally, даже если ребенок был отменен?