F # asyn c: отмена родителя / ребенка? - PullRequest
1 голос
/ 04 февраля 2020

Итак, здесь мы go: с учетом Confluent.Kafka IConsumer<> он упаковывает его в выделенный async CE и потребляет до тех пор, пока отмена не была запрошена. Этот фрагмент кода также защищает себя от OperationCancelledException и запускает блок finally, чтобы обеспечить постепенное прекращение работы потребителя.

let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
    async {
            let! ct = Async.CancellationToken
            try
                try
                    while not ct.IsCancellationRequested do 
                        let consumeResult = consumer.Consume(ct)
                        if not consumeResult.IsPartitionEOF then do! (callback consumeResult) 
                with
                | :? OperationCanceledException -> return ()
            finally
                consumer.Close()
                consumer.Dispose()
    }

Вопрос № 1: является ли этот код правильным или я злоупотребляю async?

Пока все хорошо. В моем приложении мне приходится иметь дело со многими потребителями, которые должны d ie в целом. Итак, предполагая, что consumers: seq<Async<unit>> представляет их, я пришел к следующему коду:

async {
        for consumer in consumers do 
            do! (Async.StartChild consumer |> Async.Ignore).
    }

Я ожидаю, что этот код будет связывать дочерние элементы с контекстом отмены родительского элемента, и как только он будет отменен, дочерние элементы будут быть отмененным также.

Вопрос № 2: гарантированно ли будет запущен мой блок finally, даже если ребенок был отменен?

1 Ответ

1 голос
/ 05 февраля 2020

У меня есть два замечания по поводу вашего кода:

  • Вы правильно используете Async.StartChild - все дочерние вычисления будут наследовать один и тот же токен отмены, и все они будут отменены, когда основной токен отменен.

  • Рабочий процесс asyn c можно отменить после вызова consumer.Consume(ct) и до вызова callback. Я не уверен, что это означает для вашей конкретной проблемы c, но если он удалит некоторые данные из очереди, данные могут быть потеряны до их обработки. Если это проблема, то я думаю, что вам нужно сделать callback не асинхронным или вызывать его по-другому.

  • В вашей функции consumeUntilCancelled вам не нужна экспликация проверить, пока нет, если ct.IsCancellationRequested верно. Рабочий процесс asyn c делает это автоматически в каждом do! или let!, поэтому вы можете заменить его на while l oop.

Вот минимальная автономная демонстрация:

let consume s = async {
  try
    while true do 
      do! Async.Sleep 1000
      printfn "%s did work" s
  finally
    printfn "%s finalized" s }

let work = 
  async {
    for c in ["A"; "B"; "C"; "D"] do  
      do! Async.StartChild (consume c) |> Async.Ignore }

Теперь мы создадим вычисление с токеном отмены:

// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)

// Run this sometime later
ct.Cancel()

После вызова ct.Cancel будут вызваны все блоки finally. и все петли остановятся.

...