Я не очень знаком с CCR, но я попытаюсь ответить - насколько я понимаю, арбитр чередования ведет себя немного как ReaderWriterLock
.То есть вы можете указать некоторые операции, которые могут выполняться параллельно (чтение), и некоторые операции, которые являются эксклюзивными (запись).
Следующий агент является одним из способов его реализации (не проверено, но проверяется тип: -)).Агент выставляет две операции, предназначенные для публичного использования.Последний является внутренним:
type Message<'T> =
| PerformReadOperation of ('T -> Async<unit>)
| PerformWriteOperation of ('T -> Async<'T>)
| ReadOperationCompleted
Отправляя агенту PerformReadOperation
, вы даете ему операцию, которая должна быть запущена (один раз) с использованием состояния и, возможно, параллельнос другими операциями чтения.
Отправляя агенту PerformWriteOperation
, вы даете ему операцию, которая вычисляет новое состояние и должна выполняться после завершения всех операций чтения.(Если бы вы работали с неизменяемым состоянием, это упростило бы задачу - вам не пришлось бы ждать, пока читатели завершат работу! Но реализация ниже реализует ожидание).
Агент запускаетсяс некоторым начальным состоянием:
let initial = // initial state
А остальная часть агента реализована с использованием двух циклов:
let interleaver = MailboxProcessor.Start(fun mbox ->
// Asynchronously wait until all read operations complete
let rec waitUntilReadsComplete reads =
if reads = 0 then async { return () }
else mbox.Scan(fun msg ->
match msg with
| ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
| _ -> None)
let rec readingLoop state reads = async {
let! msg = mbox.Receive()
match msg with
| ReadOperationCompleted ->
// Some read operation completed - decrement counter
return! readingLoop state (reads - 1)
| PerformWriteOperation(op) ->
do! waitUntilReadsComplete reads
let! newState = op state
return! readingLoop newState 0
| PerformReadOperation(op) ->
// Start the operation in background & increment counter
async { do! op state
mbox.Post(ReadOperationCompleted) }
|> Async.Start
return! readingLoop state (reads + 1) }
readingLoop initial 0)