Одновременное чтение и запись из коллекции в разных потоках в Java - PullRequest
0 голосов
/ 16 февраля 2019

Вот сценарий:
У меня есть 3 полосы, с которых я хочу переместить свои ящики.У каждой коробки есть идентификатор группы.Таким образом, полоса будет назначена группе.
Следовательно, когда я получу первый запрос на перемещение с уникальным groupId, я создам список и добавлю этот запрос в этот список, а затем приступлю к обработке списка.Если я получу другой запрос с тем же groupId, я должен добавить в тот же список
(этот список используется для обработки запросов в другом потоке),
еще создать новый список и назначить новую полосу и начатьобработка.
Пожалуйста, предложите коллекцию в Java, которая может помочь в эффективной реализации этого.
Любая помощь будет оценена !!

Ответы [ 2 ]

0 голосов
/ 16 февраля 2019

Я сомневаюсь, List - хорошая структура данных для вашего варианта использования.Поскольку вы передаете данные между потоками, BlockingQueue выглядит намного более естественным.

В качестве примера приведена реализация со следующими допущениями:

  • существует 1 box продюсер, который:

    1) генерирует box со случайным groupId в диапазоне от 1 до 3

    2) помещает box в один lane

  • есть 3 box потребителей, и каждый из них:

    1) получает box от lane

    2) потреблять коробку, если groupId потребителя совпадает с рамкой groupId

  • дополнительные рамки не используются (только ядро ​​Java)

import java.util.*;
import java.util.concurrent.*;

class Answer {
  public static void main(String[] args) {
     LinkedBlockingQueue<Box> lane = new LinkedBlockingQueue<>();

     Producer p = new Producer(lane);
     Consumer c1 = new Consumer(1, lane);
     Consumer c2 = new Consumer(2, lane);
     Consumer c3 = new Consumer(3, lane);

     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
     new Thread(c3).start();
  }
}

class Producer implements Runnable {
   private final LinkedBlockingQueue<Box> lane;
   Producer(LinkedBlockingQueue<Box> lane) { 
     this.lane = lane;
   }
   public void run() {
     try {
       while (true) { 
         lane.put(new Box(produceGroupId())); 
       }
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();  // set interrupt flag
     }
   }
   int produceGroupId() {
     // generate random int between 1 and 3
     return ThreadLocalRandom.current().nextInt(1, 4);
   }
}

class Consumer implements Runnable {
   private final int groupId;
   private final BlockingQueue<Box> lane;
   Consumer(int groupId, BlockingQueue<Box> lane) { 
     this.groupId = groupId;
     this.lane = lane;
   }
   public void run() {
     while (true) {
       Box box = lane.peek();
       if (box != null && box.getGroupId() == this.groupId) {
         consume(lane.poll()); 
       }
     }
   }
   void consume(Box box) { 
     System.out.println("Consumer " + groupId + " received " + box + " for proxessing.");
   }
}

class Box {
  private final int groupId;
  public Box(int groupId) {
    this.groupId = groupId;
  }

  public int getGroupId() {
    return this.groupId;
  }

  @Override
  public String toString() {
    return "<Box " + groupId + ">";
  }
}

Если цель состоит в том, чтобы иметь 3 отдельных полосы , то реализация будет немного другой:

import java.util.*;
import java.util.concurrent.*;

class Answer {
  public static void main(String[] args) {
     BlockingQueue<Box> lane1 = new LinkedBlockingQueue<Box>();
     BlockingQueue<Box> lane2 = new LinkedBlockingQueue<Box>();
     BlockingQueue<Box> lane3 = new LinkedBlockingQueue<Box>();
     Map<Integer, BlockingQueue<Box>> lanes = new ConcurrentHashMap<Integer, BlockingQueue<Box>>();
     lanes.put(1, lane1);
     lanes.put(2, lane2);
     lanes.put(3, lane3);

     Producer p = new Producer(lanes);
     Consumer c1 = new Consumer(1, lane1);
     Consumer c2 = new Consumer(2, lane2);
     Consumer c3 = new Consumer(3, lane3);

     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
     new Thread(c3).start();
  }
}

class Producer implements Runnable {
   private final Map<Integer, BlockingQueue<Box>> lanes;
   Producer(Map<Integer, BlockingQueue<Box>> lanes) { 
     this.lanes = lanes;
   }
   public void run() {
     try {
       while (true) { 
         int groupId = produceGroupId();
         BlockingQueue<Box> lane = lanes.get(groupId);
         lane.put(new Box(groupId)); 
       }
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
   }
   int produceGroupId() {
     // generate random int between 1 and 3
     return ThreadLocalRandom.current().nextInt(1, 4);
   }
}

class Consumer implements Runnable {
   private final int consumerId;
   private final BlockingQueue<Box> lane;
   Consumer(int consumerId, BlockingQueue<Box> lane) { 
     this.consumerId = consumerId;
     this.lane = lane;
   }
   public void run() {
     try {
       while (true) {
         consume(lane.take()); 
       }
     } catch (InterruptedException ex) {}
   }
   void consume(Box box) { 
     System.out.println("Consumer " + consumerId + " received " + box + " for proxessing.");
   }
}

class Box {
  private final int groupId;
  public Box(int groupId) {
    this.groupId = groupId;
  }

  public int getGroupId() {
    return this.groupId;
  }

  @Override
  public String toString() {
    return "<Box " + groupId + ">";
  }
}
0 голосов
/ 16 февраля 2019

Если List<T> необходимо преобразовать и прочитать из нескольких потоков, CopyOnWriteArrayList может быть тем, что вы ищете.В любом случае, я бы реализовал мою стратегию блокировки, если сценарий использования достаточно сложный.

, когда я получу первый запрос на перемещение с уникальным groupId, я создам список и добавлю этозапросить этот список, а затем начать обработку списка. Если я получу другой запрос с тем же groupId, я должен добавить в тот же список

Я думаю, что вам лучше использовать собственную стратегию блокировкис synchronized блоками.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...