GroupBy потока по agreggateId (Haskell / параллельный поток) - PullRequest
0 голосов
/ 17 октября 2018

Контекст : я внедряю приложение в CQRS и пытаюсь оптимизировать обработку команд (в основном 1 поток по совокупному идентификатору) ...

Проблема : Я хотел бы иметь первый поток, который получает все команды и отправляет их по их совокупному идентификатору в разных потоках:

1)Команды в агрегате обрабатываются последовательным способом
2) Агрегаты обрабатывают свои команды независимо (параллельно).

Решение : Я пытаюсь выполнить groupBy для потоков с помощью агрегатного идентификатора в основном ... Чтобы немного помочь, я упростил пример следующим образом:

module Sandbox where

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))

main :: IO ()
main = do
         runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
            threadId <- myThreadId
            liftIO $ putStrLn $ (show threadId) ++ "  value " ++ (show x))


getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]

, поэтому этот скрипт отображает следующий результат:

ThreadId 17  value 1
ThreadId 15  value 2
ThreadId 19  value 3
ThreadId 13  value 1
ThreadId 16  value 3
ThreadId 18  value 2

То, что я ожидаю, выглядит примерно так (без специального порядка, только x всегда обрабатывается в одном потоке x1):

ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z
ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z

Спасибо !!

1 Ответ

0 голосов
/ 17 октября 2018

В приведенном выше коде parallely решил создать по одному потоку на Haskell для каждого элемента в списке getAggregateIds, то есть [1,2,3,1,2,3].parallely не заботится о наличии дублирующих элементов в списке: он просто запускает поток для каждого.

В принципе, parallely может выделить только небольшое количество потоков Haskell и использовать их позже(возможно, для того же дубликата или другого идентификатора), но при этом не будет никакого увеличения производительности.Действительно, решающая часть здесь заключается в том, что выделяется поток Haskell, а не поток ОС,

Потоки Haskell очень легкие, они используют очень мало памяти, и поэтому они очень дешевы в создании и удалении.Попытка их повторного использования может привести к снижению производительности.

Кроме того, среда выполнения Haskell может выполнять множество потоков Haskell в потоках одной ОС.Обычно небольшой пул потоков ОС хранится во время выполнения, и потоки Haskell сопоставляются с ними.Поскольку потоки ОС не так легки, потоки ОС действительно повторно используются между потоками Haskell.

Наконец, обратите внимание, что ThreadId - это имя потока Haskell, а не ОС, поэтому нормально не видеть повторного использования.из этих идентификаторов.

...