Я вижу, что ваша модель может быть улучшена с точки зрения согласованности, контроля состояния и масштабирования.
Одним из способов реализации этого является присоединение элемента к вашему состоянию, постановка в очередь и удаление этой пары из очереди и создание механизма дляобеспечить изменение состояния.
Мое предложение можно увидеть на рисунке ниже:
В соответствии с этой моделью и вашим примером мы можемсделать:
package stackoverflow;
import java.util.concurrent.LinkedBlockingQueue;
import stackoverflow.item.ItemState;
import stackoverflow.task.CreatingTask;
import stackoverflow.task.FirstMovingTask;
import stackoverflow.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
Каждая задача выполняет операции op()
в соответствии с приведенным ниже подтверждением на ItemState :
, один из трех выделенных потоков перемещает еена secondQueue и, наконец, другой выделенный поток удаляет его.
ItemState
- это неизменный объект, который содержит Item
и ваш State
.Это обеспечивает согласованность между значениями Item и State.
ItemState имеет подтверждение о следующем состоянии, создающем механизм самоконтролируемого состояния:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
С реализацией ItemState:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
Таким образом, возможно создание решений более элегантных, гибких и масштабируемых.Масштабируемость, потому что вы можете контролировать больше состояний, только изменяя next()
и обобщая задачу перемещения для увеличения количества очередей.
Результаты:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
ОБНОВЛЕНИЕ (06/07/ 2018): анализ использования карты для поиска Поиск по карте с использованием равных значений, таких как компаратор, может не работать, поскольку обычно отображение между значениями и тождеством (ключ / хэш) не является взаимно-однозначным (см. Рисунок ниже),Таким образом, необходимо создать отсортированный список для значений поиска, который приводит к O (n) (наихудший случай).
с Item.getValuesHashCode()
:
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
В этом случае вы должны оставить Vector<ItemState>
вместо Item
и использовать ключ, как результат getValuesHashCode
.Измените механизм контроля состояния для сохранения первой ссылки на Элемент и текущего состояния.Смотрите ниже:
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
Для поиска вы должны использовать concurrentMapRef.get (key).Результатом будет ссылка обновленного ItemState.
Результаты моих тестов для:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
Более подробная информация в коде: https://github.com/ag-studies/stackoverflow-queue
ОБНОВЛЕНО в 06/09 /2018: редизайн
Обобщая этот проект, я могу понять, что конечный автомат выглядит примерно так:
Таким образом, я отделилсяработники очередей для улучшения концепций.Я использовал MemoryRep для сохранения уникальной ссылки на элемент в общей обработке.Конечно, вы можете использовать стратегии, основанные на событиях, если вам нужно хранить ItemState в физическом хранилище.
Это сохраняет предыдущую идею и обеспечивает большую разборчивость концепций.Смотрите это:
Я понимаю, что каждая работа будет иметь две очереди (вход / выход) и отношения с бизнес-моделью! Исследователь всегда найдет наиболее актуальное и непротиворечивое состояние предмета .
Итак, отвечая на ваш вопрос:
Я могу найти согласованное состояние Item где угодно, используя MemoryRep (в основном Map), состояние обтекания иэлемент в ItemState и управление состоянием изменения задания при постановке в очередь или его исключении из очереди.
Производительность сохраняется, за исключением запуска next ()
Состояние всегда непротиворечиво (для вашей проблемы)
В этой модели возможно использование любого типа очереди, любого количества заданий / очередей и любыхномер штата.
Кроме того, это прекрасно !!