Java сохранить список с многопоточностью - PullRequest
0 голосов
/ 08 февраля 2019

Мне нужно создать список для выполнения следующей операции:

  1. Я получаю объект из внешней очереди / темы каждую микросекунду.
  2. После выполнения некоторых операций над объектомМне нужно сохранить эти объекты в базе данных.
  3. Я делаю персистирование в пакетах по 100 или 1000. Единственная проблема в том, что частота персистирования ниже, чем частота входящих сообщений.Теперь я не хочу хранить это в одном потоке, поскольку постоянное хранение замедлит потребление сообщений.
  4. Моя идея состоит в том, чтобы продолжать принимать объекты сообщений и добавлять их в коллекцию (например, связанный список)
  5. И продолжайте удалять с другого конца коллекции партиями по 100 или 1000 и сохранять в базе данных.
  6. Какую коллекцию использовать?Как синхронизировать это и избежать исключений одновременной модификации?

Ниже приведен код, который я пытаюсь реализовать с помощью ArrayList, который очищает список каждые несколько секунд при сохранении.

class myclass{
List persistList;
ScheduledExecutorService persistExecutor;
ScheduledFuture scheduledFuture;
PersistOperation persistOperation;
//Initialize delay, interval
void init(){
scheduledFuture=persistExecutor.scheduleAtFixedRate(new persistOperation(persistList), delay, interval, TimeUnit.SECONDS);
}
void execute(msg){
//process the message and add to the persist list
}
class PersistOperation implements Runnable{
List persistList
PersistOperation(List persistList){
//Parameterized constructor
}
run(){
//Copy persistList to new ArrayList and clear persistList
//entity manager persist/update/merge
}
}
}

Ответы [ 3 ]

0 голосов
/ 08 февраля 2019

Я думаю, что вы захотите использовать LMAX Disruptor framework здесь.Я представляю два RingBuffers.Вы будете использовать первый, чтобы принимать входящие сообщения.Ваш рабочий (и) будет читать из RingBuffer.Вы должны установить размер RingBuffer равным размеру вашего постоянного блока (например, 100 или 1000).После того, как работник берет событие из RingBuffer и обрабатывает его, он помещает ссылку на сохраненный объект в коллекцию очередей.Каждый раз, когда первый RingBuffer обводится один раз, вы выделяете новую очередь и помещаете старую очередь во второй RingBuffer.Рабочие (ие) для второго RingBuffer получают объект Queue из RingBuffer, сохраняют все объекты в очереди, а затем переходят в следующую очередь.Вы можете настроить размер второго RingBuffer и рабочих потоков, чтобы обеспечить скорость, с которой база данных может сохранять ваши чанки.

0 голосов
/ 08 февраля 2019

При таком подходе вы рискуете потерять сообщения, если у вас есть 100 сообщений, которые были получены, но не сохранены, а ваше приложение умирает, можете ли вы позволить себе потерять эти сообщения?Здесь важен тип темы / очереди, у тем есть преимущество в управлении этим противодавлением, очереди обычно есть, потому что требуется упорядоченная обработка.Если вы ставите в очередь / тема kafka, и вы извлекаете сообщения, kafka может извлекать пакеты, и вы, вероятно, также можете сохранять партии в базе данных, а только сохранять сообщения для kafka после сохранения.Если ваша обработка должна быть упорядочена, вы, вероятно, можете справиться с каким-то королем реактивного подхода и настроить БД.Система очередей может контролировать поток, обычно.

0 голосов
/ 08 февраля 2019

И продолжайте удаление с другого конца коллекции партиями по 100 или 1000 и сохраняйте в базе данных.

Это разумно, если несколько потоков опрашивают коллекцию.

Ниже приведен код, который я пытаюсь реализовать с помощью ArrayList

. ArrayList - плохой выбор, поскольку он не является поточно-ориентированным и, когдаПри удалении элемента с индексом 0 каждый элемент справа от него должен быть сдвинут (операция O(n)).

Нужная коллекция называется Deque, в противном случаеизвестный как двусторонняя очередь.Однако, поскольку вам нужно, чтобы коллекция была поточно-ориентированной, я рекомендую использовать ConcurrentLinkedDeque.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...