Я пытался реализовать Haskell Control.Concurrent.MVar
, который находится в разделяемой памяти и позволяет осуществлять связь между несколькими независимыми процессами / программами с использованием функциональности POSIX.Но я потерпел неудачу с множеством тупиков.
Проблема в том, что pthread_cond_timedwait
иногда не возвращает вызов в рамках GHC FFI (хотя interruptible
или unsafe
).После нескольких дней отчаянных попыток решить проблему, я решил минимизировать код и попросить сообщество помочь.К сожалению, я не смог сжать проблему в несколько строк кода, которые можно вставить здесь.Поэтому я сохранил (как можно меньше) код на github вместе с инструкциями о том, как воспроизвести проблему , здесь есть постоянная ссылка на ее текущее состояние (ветка mvar-fail
).
По сути, функции для взятия и установки mvar выглядят так:
int mvar_take(MVar *mvar, ...) {
pthread_mutex_timedlock(&(mvar->statePtr->mvMut), &timeToWait);
while ( !(mvar->statePtr->isFull) ) {
pthread_cond_signal(&(mvar->statePtr->canPutC));
pthread_cond_timedwait(&(mvar->statePtr->canTakeC), &(mvar->statePtr->mvMut), &timeToWait);
}
memcpy(localDataPtr, mvar->dataPtr, mvar->statePtr->dataSize);
mvar->statePtr->isFull = 0;
pthread_mutex_unlock(&(mvar->statePtr->mvMut));
}
int mvar_put(MVar *mvar, ...) {
pthread_mutex_timedlock(&(mvar->statePtr->mvMut), &timeToWait);
while ( mvar->statePtr->isFull ) {
pthread_cond_signal(&(mvar->statePtr->canTakeC));
pthread_cond_timedwait(&(mvar->statePtr->canPutC), &(mvar->statePtr->mvMut), &timeToWait);
}
memcpy(mvar->dataPtr, localDataPtr, mvar->statePtr->dataSize);
mvar->statePtr->isFull = 1;
pthread_mutex_unlock(&(mvar->statePtr->mvMut));
}
(плюс проверка ошибок и printfs после каждой команды). Полный код для mvar_take
. Инициализация происходит следующим образом:
pthread_mutexattr_init(&(s.mvMAttr));
pthread_mutexattr_settype(&(s.mvMAttr), PTHREAD_MUTEX_ERRORCHECK);
pthread_mutexattr_setpshared(&(s.mvMAttr), PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(s.mvMut), &(s.mvMAttr));
pthread_condattr_init(&(s.condAttr));
pthread_condattr_setpshared(&(s.condAttr), PTHREAD_PROCESS_SHARED);
pthread_cond_init(&(s.canPutC), &(s.condAttr));
pthread_cond_init(&(s.canTakeC), &(s.condAttr));
Полный код. Часть Haskell выглядит следующим образом:
foreign import ccall interruptible "mvar_take"
mvar_take :: Ptr StoredMVarT -> Ptr a -> CInt -> IO CInt
foreign import ccall interruptible "mvar_put"
mvar_put :: Ptr StoredMVarT -> Ptr a -> CInt -> IO CInt
takeMVar :: Storable a => StoredMVar a -> IO a
takeMVar (StoredMVar _ fp) = withForeignPtr fp $ \p -> alloca $ \lp -> do
r <- mvar_take p lp
if r == 0
then peek lp
else throwErrno $ "takeMVar failed with code " ++ show r
putMVar :: Storable a => StoredMVar a -> a -> IO ()
putMVar (StoredMVar _ fp) x = withForeignPtr fp $ \p -> alloca $ \lp -> do
poke lp x
r <- mvar_put p lp
unless (r == 0)
$ throwErrno $ "putMVar failed with code " ++ show r
Полный код. Изменение FFI с interruptible
на unsafe
не предотвращает взаимоблокировку.Иногда взаимная блокировка происходит при каждом втором запуске, иногда это происходит только после 50 запусков (а остальные выполняются, как и ожидалось).
Я предполагаю, что GHC может помешать работе мьютексов POSIX с некоторой обработкой сигналов ОС, ноЯ недостаточно знаю внутренности GHC, чтобы это проверить.
Это я что-то делаю глупо или мне нужно добавить какие-то специальные приемы, чтобы заставить его работать внутри GHC FFI?
PS: последняя версия README с моими исследованиями доступна по адресу interprocess mvar-fail
.
ОБНОВЛЕНИЕ 13.06.2018 : я пытался временно заблокироватьвсе сигналы ОС окружают код функции следующим образом:
sigset_t mask, omask;
sigfillset(&mask);
sigprocmask(SIG_SETMASK, &mask, &omask);
...
sigprocmask(SIG_SETMASK, &omask, NULL);
Это не помогло.