Совместное использование контекста между обратными вызовами AMQP - PullRequest
0 голосов
/ 30 ноября 2018

Существует простое руководство по использованию RabbitMQ для Haskell, где я взял этот кусок кода

main :: IO ()
main = do
     conn <- openConnection "127.0.0.1" "/" "guest" "guest"
     ch   <- openChannel conn

     declareQueue ch newQueue {queueName       = "hello",
                               queueAutoDelete = False,
                               queueDurable    = False}

     putStrLn " [*] Waiting for messages. To exit press CTRL+C"
     consumeMsgs ch "hello" NoAck deliveryHandler

     -- waits for keypresses
     getLine
     closeConnection conn

deliveryHandler :: (Message, Envelope) -> IO ()
deliveryHandler (msg, metadata) =
  BL.putStrLn $ " [x] Received " <> msgBody msg

. Оно просто объясняет, как получить сообщение из очереди и обработать его с помощью обратного вызова.

Одна вещь может быть проста для решения, но я изо всех сил пытаюсь понять, как добавить некоторый изменяемый контекст в обратный вызов, поэтому каждый раз, когда функция запускается, она может изменить ее.Просто, как рассчитать номер сообщения в порядке очереди.Я обнаружил, что возможным решением является государственная монада, не так ли?

И второй вопрос - все эти обратные вызовы обрабатываются параллельно или нет?Если нет, то как обрабатывать их параллельно и сохранять изменяемый контекст без гонки данных?

Ответы [ 2 ]

0 голосов
/ 30 ноября 2018

Опираясь на ответ @ bergey - вы можете создать изменяемую ссылку, такую ​​как IORef или MVar.Эти ссылки могут быть переданы вашему обработчику с помощью частичного применения функции.Типизированный, но не проверенный код:ref для deliveryHandler приложением функции.

     -- waits for keypresses
     getLine
     closeConnection conn

deliveryHandler :: MVar Int -> (Message, Envelope) -> IO ()
deliveryHandler ref (msg, metadata) =
  BL.putStrLn $ " [x] Received " <> msgBody msg
  withMVar' ref $ \val ->
       do print val
          pure (val + 1)

И, наконец, мы можем работать с ref, используя функцию из Control.Concurrent.MVar, получая старое значение и заменяя его новым желаемым значением.

0 голосов
/ 30 ноября 2018

Если вы планируете обрабатывать несколько сообщений параллельно (в одном и том же процессе на Haskell), я бы начал с MVar для хранения общего состояния.

MVar в основном является общей переменнойс замком и разумным интерфейсом.В простых случаях (например, счетчик) этого достаточно, чтобы предотвратить скачки данных.Это золотая середина между низкоуровневыми (IORef) и высокоуровневыми (STM) абстракциями над общей памятью.Я думаю, что это проще всего понять, и я использую его для всех начальных прототипов.

Я не знаю библиотеку RabbitMQ, поэтому не могу ответить на ваш второй вопрос о том, обрабатываются ли сообщения параллельно.

...