Я задал вопрос об этом вчера и получил много полезных отзывов (спасибо!), Но я не думаю, что дал достаточно информации в вопросе - следовательно, еще один.
У меня есть две темы, которые одновременнопрочитайте два файла.Они помещают информацию из этих файлов в два ConcurrentQueues.Затем идут еще два потока, снимают с очереди элементы из ConcurrentQueues и помещают элементы в один ConcurrentDictionary.При обновлении элемента в Словаре потокам может потребоваться создать новый объект или просто сообщить текущему объекту, что поступило больше информации. В последнем случае иногда требуется длительное сканирование.Иногда, после этого сканирования, объект говорит, что все в порядке, чтобы удалить его (как попытку сохранить память), и поток удаляет его из словаря.
Мой текущий (сломанный) код ниже:
string dictionaryKey = myMessage.someValue;
Monitor.Enter(GetDictionaryLocker);
DictionaryObject currentObject = myConcurrentDictionary.GetOrAdd(dictionaryKey, new DictionaryObject());
// we can be interrupted here
lock (currentObject)
{
Monitor.Exit(GetDictionaryLocker);
//KeyNotFoundException is possible on line below
if (myConcurrentDictionary[dictonaryKey].scan(myMessage)) // Scans the message - returns true if the object says its OK to remove it from the dictionary
{
DictionaryObject temp; // It's OK to delete it
if (!queuedMessages.TryRemove(ric, out temp)) // Did delete work?
throw new Exception("Was unable to delete a DictionaryObject that just reported it was ok to delete it");
}
}
Что происходит следующим образом:
Между поиском нужного мне объекта в Словаре:
DictionaryObject currentObject = myConcurrentDictionary.GetOrAdd(dictionaryKey, new DictionaryObject());
и последующей блокировкой этого объекта:
lock (currentObject)
поток может быть перехвачен, так что есть шанс, что другой поток удалил объект из словаря к тому времени, когда я попытаюсь получить к нему доступ здесь:
if (myConcurrentDictionary[dictonaryKey].scan(myMessage))
Это затем приводит к KeyNotFoundException .Мне нужен какой-то способ атомной блокировки объекта.
Как я уже говорил, вчера я получил несколько предложений, но я не понимаю, как их можно использовать
- Один из авторов упомянул, чтоЯ должен сначала попытаться удалить элементы из словаря, так как это атомарная операция с ConcurrentDictionary, а затем заново добавить их.Тем не менее, я не уверен, как я мог бы тогда указать другому потоку, что он должен ждать повторного добавления элемента, а не просто думать о пропущенном значении и создавать его.
- Другой постер принесup
Threading.Interlocked.CompareExchange
, который я мог бы использовать, чтобы отметить, что объект «используется».Но я не знаю, как справиться со случаем использования объекта - как мне его ждать?
У меня есть некоторые ограничения: мне нужно обработать ConcurrentQueues по порядку, поэтомуЯ не могу отказаться от размещения объекта в Словаре или вернуться позже - мне нужно заблокировать.Словарь может содержать 500 000 или более элементов, поэтому мне действительно нужно время поиска O (1) в ConcurrentDictionary.
Есть идеи?Извините за длинный пост
Спасибо,
Фредерик