Бит немного зависит от того, является ли вставщик для каждой очереди или для всех очередей.Если я понимаю вашу спецификацию, я думаю, что сработает что-то вроде следующего.
Writer добавляет элемент в одну из LinkedBlockingQueue
коллекций на вашей карте.Если размер очереди больше X (если вы хотите ее для каждой очереди), то это сигнализирует поток вставки MySQL.Примерно так должно работать:
queue.add(newItem);
// race conditions here that may cause multiple signals but that's ok
if (queue.size() > 1000) {
// this will work if there is 1 inserter per queue
synchronized (queue) {
queue.notify();
}
}
...
Затем вставщик ожидает в очереди и выполняет что-то вроде следующего цикла:
List insertList = new ArrayList();
while (!done) {
synchronized (queue) {
// typically this would be while but if we are notified or timeout we insert
if (queue.size() < 1000) {
queue.wait(MILLIS_TIME_INTERVAL);
}
}
queue.drainTo(insertList);
// insert them into the db
insertList.clear();
}
Это становится немного сложнее, если есть 1поток делает вставки во всех очередях.Наверное, вопрос в том, почему у вас вообще есть ConcurrentHashMap
?Если у вас есть 1 средство вставки, которое, например, вставляет в несколько таблиц или что-то еще, то вам понадобится механизм, чтобы сообщить вставке , какие очередь (и) должны быть опустошены.Это может просто пройти через все очереди на карте, но это может быть дорого.Вы бы синхронизировались на каком-либо объекте глобальной блокировки или, возможно, на объекте карты вместо очереди.
О, и, как упоминал @Peter Lawrey, вы быстро исчерпаете память, если ваша база данных будет медленнее, чем пишущие, поэтомуубедитесь, что для очередей задана правильная емкость, поэтому они ограничивают авторов и уменьшают рабочую память.
Надеюсь, это поможет.