Внедрить CCR Interleave Arbiter в F # - PullRequest
       29

Внедрить CCR Interleave Arbiter в F #

1 голос
/ 28 апреля 2011

Я хочу реализовать концепцию порта платформы CCR в F # (поскольку CCR официально не поддерживается для .Net 4.0).Я знаю, что для этого можно использовать класс MailboxProcessor в F #.Это прекрасно работает для простых Receive Arbiters, но мне нужна концепция Interleave Arbiter, т.е. я хочу контролировать, какие сообщения обрабатываются исключительно, а какие обрабатываются одновременно.Пока что я понятия не имею, реализовать это на F # и буду благодарен за вашу помощь.

Ответы [ 2 ]

1 голос
/ 29 апреля 2011

Просто для добавления к предложенному Томасом решению, в случае, если вы не хотите предоставлять сообщение «ReadOperationCompleted» потребителю почтового ящика (поскольку это сообщение является внутренним и в текущей реализации может быть отправлено любым потребителемпочтовый ящик) можно создать отдельный почтовый ящик внутри функции процессора основного почтового ящика, которая будет принимать два сообщения: ReadOperationCompleted и WaitForReadCompleted (это будет использоваться с PostAndAsyncReply основным почтовым ящиком), поскольку ответ на это сообщение будет приходить только тогдавсе операции чтения завершены.Также счетчик «read», представленный как «reads», будет перемещен в этот новый внутренний почтовый ящик, так как это состояние будет инкапсулировано этим внутренним почтовым ящиком.

1 голос
/ 28 апреля 2011

Я не очень знаком с CCR, но я попытаюсь ответить - насколько я понимаю, арбитр чередования ведет себя немного как ReaderWriterLock.То есть вы можете указать некоторые операции, которые могут выполняться параллельно (чтение), и некоторые операции, которые являются эксклюзивными (запись).

Следующий агент является одним из способов его реализации (не проверено, но проверяется тип: -)).Агент выставляет две операции, предназначенные для публичного использования.Последний является внутренним:

type Message<'T> =
  | PerformReadOperation of ('T -> Async<unit>)
  | PerformWriteOperation of ('T -> Async<'T>)
  | ReadOperationCompleted
  • Отправляя агенту PerformReadOperation, вы даете ему операцию, которая должна быть запущена (один раз) с использованием состояния и, возможно, параллельнос другими операциями чтения.

  • Отправляя агенту PerformWriteOperation, вы даете ему операцию, которая вычисляет новое состояние и должна выполняться после завершения всех операций чтения.(Если бы вы работали с неизменяемым состоянием, это упростило бы задачу - вам не пришлось бы ждать, пока читатели завершат работу! Но реализация ниже реализует ожидание).

Агент запускаетсяс некоторым начальным состоянием:

let initial = // initial state

А остальная часть агента реализована с использованием двух циклов:

let interleaver = MailboxProcessor.Start(fun mbox ->

  // Asynchronously wait until all read operations complete
  let rec waitUntilReadsComplete reads = 
    if reads = 0 then async { return () }
    else mbox.Scan(fun msg ->
      match msg with
      | ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
      | _ -> None)

  let rec readingLoop state reads = async {
    let! msg = mbox.Receive()
    match msg with
    | ReadOperationCompleted ->
        // Some read operation completed - decrement counter
        return! readingLoop state (reads - 1) 
    | PerformWriteOperation(op) ->
        do! waitUntilReadsComplete reads
        let! newState = op state
        return! readingLoop newState 0
    | PerformReadOperation(op) ->
        // Start the operation in background & increment counter
        async { do! op state
                mbox.Post(ReadOperationCompleted) }
        |> Async.Start
        return! readingLoop state (reads + 1) }
  readingLoop initial 0)
...