Фон
В ответ на вопрос я собрал и загрузил bched-tchan (для меня было бы неправильно загрузить версию jnb ). Если имени недостаточно, ограниченный канал (BTChan) - это канал STM с максимальной пропускной способностью (записывает блок, если канал заполнен).
Недавно я получил запрос на добавление функции дублирования, как в обычном TChan . И таким образом начинается проблема.
Как выглядит BTChan
Ниже приведено упрощенное (и фактически не функциональное) представление BTChan.
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
Каждый раз, когда вы пишете на канал, вы включаете количество дуплей (nrDups
) в кортеж - это «счетчик отдельных элементов», который указывает, сколько читателей получили этот элемент.
Каждый читатель будет уменьшать счетчик для элемента, который он читает, затем перемещать свой указатель чтения к следующему элементу в списке. Если считыватель уменьшает счетчик до нуля, тогда значение count
уменьшается, чтобы правильно отобразить доступную пропускную способность канала.
Для уточнения желаемой семантики: пропускная способность канала указывает максимальное количество элементов в очереди в канале. Любой данный элемент ставится в очередь, пока читатель каждого дубликата не получит элемент. Никакие элементы не должны оставаться в очереди для дублирования GCed (это главная проблема).
Например, пусть будет три дупла канала (c1, c2, c3) емкостью 2, где в канал записано 2 элемента, после чего все элементы были считаны из c1
и c2
. Канал все еще заполнен (0 оставшихся емкостей), поскольку c3
не использует его копии. В любой момент времени, если все ссылки на c3
отброшены (поэтому c3
является GCed), тогда емкость следует освободить (в этом случае восстановить до 2).
Вот проблема: допустим, у меня есть следующий код
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
В результате BTChan выглядит так:
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
Обратите внимание, что в конце счетчик чтения для "hello"
все еще 1
? Это означает, что сообщение не считается пропавшим (даже если оно получит GCed в реальной реализации), и наш count
никогда не будет уменьшаться. Поскольку канал заполнен (максимум 1 элемент), авторы всегда будут блокировать.
Я хочу, чтобы финализатор создавался каждый раз, когда вызывается dupBTChan
. Когда собранный (или исходный) канал собран, все элементы, остающиеся для чтения на этом канале, уменьшат количество элементов, также будет уменьшена переменная nrDups
. В результате будущие записи будут иметь правильный count
(count
, который не резервирует место для переменных, не считываемых каналами GCed).
Решение 1 - Ручное управление ресурсами (чего я хочу избежать)
По этой причине в bNed-tchan JNB есть ручное управление ресурсами. Смотрите cancelBTChan
. Я собираюсь сделать что-то более сложное для пользователя, чтобы ошибиться (во многих случаях ручное управление не является правильным способом).
Решение 2 - Используйте исключения, блокируя на TVars (GHC не может делать это так, как я хочу)
РЕДАКТИРОВАТЬ это решение, а решение 3, которое является дополнительным, не работает! Из-за ошибки 5055 (WONTFIX) компилятор GHC отправляет исключения обоим заблокированным потокам, даже если одного достаточно (что теоретически определимо, но не практично для GHC GC).
Если все способы получения BTChan
являются IO, мы можем forkIO
поток, который читает / повторяет попытку на дополнительном (фиктивном) поле TVar, уникальном для данного BTChan
. Новый поток поймает исключение, когда все другие ссылки на TVar будут отброшены, поэтому он будет знать, когда уменьшать счетчики nrDups
и отдельные элементы. Это должно работать, но заставляет всех моих пользователей использовать ввод-вывод, чтобы получить BTChan
s:
data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO ()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
РЕДАКТИРОВАТЬ: Да, это финализатор плохой человек, и у меня нет особых причин, чтобы избегать использования addFinalizer
.Это было бы тем же решением, которое все еще заставляло бы использовать IO afaict.
Решение 3: более чистый API, чем решение 2, но GHC все еще не поддерживает его
Пользователи запускают поток менеджера, вызывая initBTChanCollector
, который будет следить за набором этих фиктивных телевизионных программ (из решения 2) и выполнять необходимую очистку.По сути, он помещает IO в другой поток, который знает, что делать через глобальный (unsafePerformIO
ed) TVar
.Вещи работают в основном как решение 2, но создание BTChan может быть STM.Невозможность запустить initBTChanCollector
может привести к постоянно растущему списку (утечке пространства) задач по мере выполнения процесса.
Решение 4. Никогда не разрешать отбрасывать BTChan
s
Это похоже на игнорирование проблемы.Если пользователь никогда не удалит дублируемый BTChan
, тогда проблема исчезнет.
Решение 5 Я вижу ответ ezyang (полностью действительный и ценный), но на самом деле хотел бы сохранить текущий API простос функцией dup.
** Решение 6 ** Скажите, пожалуйста, есть лучший вариант.
РЕДАКТИРОВАТЬ: I внедрил решение 3 (полностью непроверенный альфа-релиз)) и обработал потенциальную утечку пространства, сделав сам глобал BTChan
- этот канал, вероятно, должен иметь емкость 1, так что забыв запустить init
обнаруживается очень быстро, но это незначительное изменение.Это работает в GHCi (7.0.3), но это кажется случайным.GHC генерирует исключения в оба заблокированных потока (действительный, читающий BTChan и наблюдающий поток), поэтому, если вы заблокированы, читая BTChan, когда другой поток отбрасывает его ссылку, вы умираете.