Управление состояниями в нескольких потоках - PullRequest
0 голосов
/ 05 мая 2018

У меня есть класс F # с кучей последовательностей. Класс содержит простой метод next(), который возвращает следующий элемент в текущей последовательности. Если все элементы текущей последовательности были возвращены, она перейдет к следующей последовательности. Класс содержит указатель, следующим элементом которого является последовательность и из какой последовательности он возвращается.

В настоящее время я ограничен только раскрытием метода next().

Некоторые из вышестоящих классов будут использовать мой класс (один и тот же экземпляр объекта) между разными потоками. Это приведет к потере синхронизации, поскольку все потоки должны начинаться с нуля. Я знаю, что это не идеально, но это то, с чем мне приходится работать в данный момент.

Пример:

Thread 1 next(): return elem. A Thread 1 next(): return elem. B Thread 2 next(): return elem. A Thread 1 next(): return elem. C Thread 2 next(): return elem. B

Есть ли способ отслеживания указателя для каждого потока?

Я думал об использовании Threading.Thread.CurrentThread.ManagedThreadId в качестве ключа на карте, а затем вернул указатель (и соответственно обновил его там). Я немного обеспокоен безопасностью потоков этой Карты, и если два потока обновляют свое состояние одновременно.

Я надеюсь, что Сомоне может дать мне несколько мыслей о том, как заставить это работать.

1 Ответ

0 голосов
/ 05 мая 2018

Этого можно достичь, используя MailboxProcessor для управления состоянием, затем используя класс для абстрагирования MailboxProcessor от потребителя. Если вы разделяете экземпляр между несколькими потоками, они будут видеть обновления друг друга в поточном режиме. Если вы используете выделенный экземпляр для каждого потока, они будут видеть только свои обновления. Код для этого хотел бы что-то вроде этого:

// Add whatever other commands you need
type private SequenceMessage = Next of AsyncReplyChannel<int>

type IntSequence() =
    let agent = MailboxProcessor<SequenceMessage>.Start
                <| fun inbox ->
                    let rec loop state =
                        async {
                            let! message = inbox.Receive()
                            // Add other matches as requried
                            match message with
                            | Next channel -> 
                                let newState = state + 1
                                channel.Reply(newState)
                                return! loop newState
                        }
                    loop 0

    let next () =
        agent.PostAndReply <| fun reply -> Next reply

    let asyncNext () =
        agent.PostAndAsyncReply <| fun reply -> Next reply

    member __.Next () = next ()
    member __.AsyncNext () = asyncNext ()

Затем, чтобы использовать его так, чтобы каждый поток видел обновления из всех других потоков, вы должны сделать что-то эквивалентное этому:

// To share state across multiple threads, use the same instance
let sequence = IntSequence()
[1..10]
|> List.map (fun _ -> sequence.AsyncNext())
|> Async.Parallel
|> Async.RunSynchronously
|> Array.iter (fun i -> printfn "%d" i)

Какие отпечатки:

1
2
3
4
5
6
7
8
9
10

И чтобы использовать его так, чтобы каждый поток видел только свои обновления, вы просто изменили бы предыдущий пример на что-то вроде этого:

// To use a dedicate state for each thread, create a new instance
[1..10]
|> List.map (fun _ -> IntSequence())
|> List.map (fun sequence -> sequence.AsyncNext())
|> Async.Parallel
|> Async.RunSynchronously
|> Array.iter (fun i -> printfn "%d" i)

Какие отпечатки:

1
1
1
1
1
1
1
1
1
1
...