Хранение произвольных вызовов функций между потоками - PullRequest
4 голосов
/ 31 декабря 2011

Я пытаюсь написать библиотеку с целью воспроизвести семантику потоков Qt: сигналы могут быть подключены к слотам, и все слоты выполняются в известном потоке, так что слоты, связанные с одним и тем же потоком, являются поточно-ориентированными по отношению друг к другу.

У меня есть следующий API:

data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

Проблема в получении от emit до execute. Аргумент должен быть каким-то образом сохранен во время выполнения, а затем выполнено действие ввода-вывода, но я не могу пройти мимо проверки типов.

Вещи, которые мне нужны:

  1. Безопасность типов: сигналы не должны подключаться к слотам, ожидающим другого типа.
  2. Независимость от типа: может быть больше одного слота для любого данного типа (возможно, это можно ослабить с помощью newtype и / или TH).
  3. Простота использования: поскольку это библиотека, сигналы и слоты должны легко создаваться.

Вещи, которые я пробовал:

  • Data.Dynamic : делает все это действительно хрупким, и я не нашел способа выполнить правильно введенное действие ввода-вывода на Dynamic. Есть dynApply , но он чистый.
  • Экзистенциальные типы : мне нужно выполнить функцию, переданную в mkSlot, а не произвольную функцию, основанную на типе.
  • Data.HList : Я недостаточно умен, чтобы понять это.

Чего мне не хватает?

1 Ответ

3 голосов
/ 31 декабря 2011

Во-первых, вы уверены, что слоты действительно хотят выполнять в определенном потоке? Легко написать потокобезопасный код на Haskell, а потоки очень легки в GHC, так что вы не добьетесь больших результатов, привязав все выполнение обработчика событий к определенному потоку на Haskell.

Кроме того, для обратного вызова mkSlot нет необходимости давать сам слот: вы можете использовать рекурсивную нотацию , чтобы связать слот в его обратном вызове, не добавляя беспокойства о завязывании узла. до mkSlot.

В любом случае, вам не нужно ничего более сложного, чем эти решения. Я ожидаю, что когда вы говорите об экзистенциальных типах, вы думаете о том, чтобы отправить что-то вроде (a -> IO (), a) через TChan (которое вы упомянули, используя в комментариях) и применить его на другом конце, но вы хотите, чтобы TChan принимать значения этого типа для любого a , а не только для одного конкретного a . Ключевым моментом здесь является то, что если у вас есть (a -> IO (), a) и вы не знаете, что такое a , единственное, что вы можете сделать, это применить функцию к значению, что даст вам IO () - так что мы можно просто отправить их через канал!

Вот пример:

import Data.Unique
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

newtype SlotGroup = SlotGroup (IO () -> IO ())

data Signal a = Signal Unique (TVar [Slot a])
data Slot a = Slot Unique SlotGroup (a -> IO ())

-- When executed, this produces a function taking an IO action and returning
-- an IO action that writes that action to the internal TChan. The advantage
-- of this approach is that it's impossible for clients of newSlotGroup to
-- misuse the internals by reading the TChan or similar, and the interface is
-- kept abstract.
newSlotGroup :: IO SlotGroup
newSlotGroup = do
  chan <- newTChanIO
  _ <- forkIO . forever . join . atomically . readTChan $ chan
  return $ SlotGroup (atomically . writeTChan chan)

mkSignal :: IO (Signal a)
mkSignal = Signal <$> newUnique <*> newTVarIO []

mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f

connect :: Signal a -> Slot a -> IO ()
connect (Signal _ v) slot = atomically $ do
  slots <- readTVar v
  writeTVar v (slot:slots)

emit :: Signal a -> a -> IO ()
emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)

execute :: Slot a -> a -> IO ()
execute (Slot _ (SlotGroup send) f) a = send (f a)

Используется TChan для отправки действий рабочему потоку, к которому привязан каждый слот.

Обратите внимание, что я не очень знаком с Qt, поэтому, возможно, я упустил некоторые тонкости модели. Вы также можете отключить слоты с помощью:

disconnect :: Signal a -> Slot a -> IO ()
disconnect (Signal _ v) (Slot u _ _) = atomically $ do
  slots <- readTVar v
  writeTVar v $ filter keep slots
  where keep (Slot u' _) = u' /= u

Возможно, вы захотите что-то вроде Map Unique (Slot a) вместо [Slot a], если это может быть узким местом.

Итак, решение здесь состоит в том, чтобы (а) распознать, что у вас есть нечто, что фундаментально основано на изменчивом состоянии, и использовать изменяемую переменную для его структурирования; (б) понять, что функции и действия ввода-вывода являются первоклассными, как и все остальное, поэтому вам не нужно делать ничего особенного, чтобы конструировать их во время выполнения:)

Кстати, я рекомендую сохранять реализации Signal и Slot абстрактными, не экспортируя их конструкторы из определяющего их модуля; в конце концов, есть много способов решить этот подход без изменения API.

...