Отслеживание прогресса между очередями на карте - PullRequest
0 голосов
/ 31 мая 2018

В настоящее время у меня есть две очереди и предметы, путешествующие между ними.Сначала элемент помещается в firstQueue, затем один из трех выделенных потоков перемещает его в secondQueue и, наконец, другой выделенный поток удаляет его.Эти шаги, очевидно, включают некоторую обработку.Мне нужно иметь возможность получить статус любого элемента (IN_FIRST, AFTER_FIRST, IN_SECOND, AFTER_SECOND или ABSENT), и я реализовал его вручную, выполнив обновление statusMap, гдеочередь изменяется следующим образом:

while (true) {
    Item i = firstQueue.take();
    statusMap.put(i, AFTER_FIRST);
    process(i);
    secondQueue.add(i);
    statusMap.put(i, IN_SECOND);
}

Это работает, но уродливо и оставляет временное окно, в котором статус не согласован.Несоответствие не имеет большого значения, и его можно было бы решить с помощью синхронизации, но это может иметь неприятные последствия, поскольку очередь имеет ограниченную емкость и может блокироваться.Уродство беспокоит меня больше.

Эффективность вряд ли имеет значение, так как обработка занимает секунды.Выделенные потоки используются для контроля параллелизма.Ни один предмет не должен быть в нескольких состояниях (но это не очень важно и не гарантируется моим текущим подходом).Будет больше очередей (и состояний), и они будут разных типов (DelayQueue, ArrayBlockingQueue и, возможно, PriorityQueue).

Интересно, есть ли хорошее обобщаемое решение?в несколько очередей?

Ответы [ 4 ]

0 голосов
/ 07 июня 2018

Вот что-то отличное от того, что говорили другие.Используя мир служб и систем очередей, мы имеем концепцию подтверждения сообщений.Это хорошо, потому что это также дает вам некоторую встроенную логику повторов.

Я расскажу, как это будет работать с высокого уровня, и, если вам нужно, я могу добавить код.

По сути, у вас будет Set для каждой очереди.Вы завернете свои очереди в объект так, чтобы при снятии с очереди элемента происходили некоторые вещи

  1. Элемент удаляется из очереди
  2. Элемент добавляется в связанный набор
  3. Запланирована задача (лямбда, содержащая атомарный логический тип (по умолчанию false)).При запуске он удаляет элемент из набора, и если логическое значение равно false, помещает его обратно в очередь
  4. Элемент и обертка вокруг логического значения возвращаются вызывающей стороне

Как только process(i); завершится, ваш код будет указывать подтверждение получения для оболочки, а оболочка удалит элемент из набора и сделает логическое значение false.

Метод для возврата состояния просто проверяет, какая очередь или наборэлемент находится в.

Обратите внимание, что это дает «хотя бы один раз» доставки, то есть элемент будет обработан как минимум один раз, но, возможно, более одного раза, если время обработки слишком близко к тайм-ауту.

0 голосов
/ 03 июня 2018

Как ранее ответили, Оберните очереди, или элемент будет жизнеспособным решением или и тем, и другим.

public class ItemWrapper<E> {
   E item;
   Status status;
   public ItemWrapper(Item i, Status s){ ... }
   public setStatus(Status s){ ... }
   // not necessary if you use a queue wrapper (see queue wrapper)
   public boolean equals(Object obj) {
     if ( obj instanceof ItemWrapper)
       return item.equals(((ItemWrapper) obj).item) 
     return false;
   }
   public int hashCode(){
     return item;
   }
}
...
process(item) // process update status in the item
...

Вероятно, лучший способ, уже отвеченный, состоит в том, чтобы иметь QueueWrapper, который обновляет состояние очереди.Для забавы я не использую карту состояния, но я использую ранее обертку элемента, она кажется чище (карта состояния тоже работает).

public class QueueWrapper<E> implements Queue<E> {
  private Queue<ItemWrapper<E>> myQueue;
  static private Status inStatus; // FIRST
  static private Status outStatus; // AFTER_FIRST
  public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
  @Override
  public boolean add(E e) {
    return myQueue.add(new ItemWrapper(e, inStatus));
  }
  @Override
  public E remove(){
    ItemWrapper<E> result = myQueue.remove();
    result.setStatus(outStatus)
    return result.item;
  }
  ...  
  }

Вы также можете использовать AOP для добавления обновления статуса в свои очереди безизменение ваших очередей (карта состояния должна быть более подходящей, чем itemwrapper).

Возможно, я не очень хорошо ответил на ваш вопрос, потому что простой способ узнать, где находится ваш элемент, можно проверить в каждой очереди с помощью «содержит"функция.

0 голосов
/ 07 июня 2018

Я вижу, что ваша модель может быть улучшена с точки зрения согласованности, контроля состояния и масштабирования.

Одним из способов реализации этого является присоединение элемента к вашему состоянию, постановка в очередь и удаление этой пары из очереди и создание механизма дляобеспечить изменение состояния.

Мое предложение можно увидеть на рисунке ниже:

enter image description here

В соответствии с этой моделью и вашим примером мы можемсделать:

package stackoverflow;

import java.util.concurrent.LinkedBlockingQueue;

import stackoverflow.item.ItemState;
import stackoverflow.task.CreatingTask;
import stackoverflow.task.FirstMovingTask;
import stackoverflow.task.SecondMovingTask;

public class Main {

    private static void startTask(String name, Runnable r){
        Thread t = new Thread(r, name);
        t.start();
    }

    public static void main(String[] args) {
        //create queues
        LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
        LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
        //start three threads
        startTask("Thread#1", new CreatingTask(firstQueue));
        startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
        startTask("Thread#3", new SecondMovingTask(secondQueue));
    }
}

Каждая задача выполняет операции op() в соответствии с приведенным ниже подтверждением на ItemState :

, один из трех выделенных потоков перемещает еена secondQueue и, наконец, другой выделенный поток удаляет его.

ItemState - это неизменный объект, который содержит Item и ваш State.Это обеспечивает согласованность между значениями Item и State.

ItemState имеет подтверждение о следующем состоянии, создающем механизм самоконтролируемого состояния:

public class FirstMovingTask {
    //others codes
    protected void op() {
            try {
                //dequeue
                ItemState is0 = new ItemState(firstQueue.take());
                System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
                //process here
                //enqueue
                ItemState is1 = new ItemState(is0);
                secondQueue.add(is1);
                System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    //others codes
}

С реализацией ItemState:

public class ItemStateImpl implements ItemState {
    private final Item item;
    private final State state;

    public ItemStateImpl(Item i){
        this.item = i;
        this.state = new State();
    }

    public ItemStateImpl(ItemState is) {
        this.item = is.getItem();
        this.state = is.getState().next();
    }

    // gets attrs
}

Таким образом, возможно создание решений более элегантных, гибких и масштабируемых.Масштабируемость, потому что вы можете контролировать больше состояний, только изменяя next() и обобщая задачу перемещения для увеличения количества очередей.

Результаты:

Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others

ОБНОВЛЕНИЕ (06/07/ 2018): анализ использования карты для поиска Поиск по карте с использованием равных значений, таких как компаратор, может не работать, поскольку обычно отображение между значениями и тождеством (ключ / хэш) не является взаимно-однозначным (см. Рисунок ниже),Таким образом, необходимо создать отсортированный список для значений поиска, который приводит к O (n) (наихудший случай).

enter image description here

с Item.getValuesHashCode():

private int getValuesHashCode(){
  return new HashCodeBuilder().append(value).hashCode();
}

В этом случае вы должны оставить Vector<ItemState> вместо Item и использовать ключ, как результат getValuesHashCode.Измените механизм контроля состояния для сохранения первой ссылки на Элемент и текущего состояния.Смотрите ниже:

//Main.class
public static void main(String[] args) {
    ... others code ...

    //references repository
    ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
    //start three threads
    startTask("Thread#1", new CreatingTask(firstQueue, statesMap));

    ... others code ...
}

//CreateTask.class
protected void op() throws InterruptedException {
    //create item
    ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
    //put in monitor and enqueue
    int key = is.getHashValue();
    Vector<ItemState> items = map.get(key);
    if (items == null){
        items = new Vector<>();
        map.put(key, items);
    }
    items.add(is);
    //enqueue
    queue.put(is);
}

//FirstMovingTask.class
protected void op() throws InterruptedException{
    //dequeue
    ItemState is0 = firstQueue.take();
    //process
    ItemState is1 = process(is0.next());
    //enqueue 
    secondQueue.put(is1.next());
}

//ItemState.class
public ItemState next() {
    //required for consistent change state
    synchronized (state) {
        state = state.next();
        return this;
    }
}

Для поиска вы должны использовать concurrentMapRef.get (key).Результатом будет ссылка обновленного ItemState.

Результаты моих тестов для:

# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0    : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_FIRST 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - IN_SECOND 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_SECOND 
Item#12#1   : a - IN_FIRST 

Более подробная информация в коде: https://github.com/ag-studies/stackoverflow-queue

ОБНОВЛЕНО в 06/09 /2018: редизайн

Обобщая этот проект, я могу понять, что конечный автомат выглядит примерно так:

enter image description here

Таким образом, я отделилсяработники очередей для улучшения концепций.Я использовал MemoryRep для сохранения уникальной ссылки на элемент в общей обработке.Конечно, вы можете использовать стратегии, основанные на событиях, если вам нужно хранить ItemState в физическом хранилище.

Это сохраняет предыдущую идею и обеспечивает большую разборчивость концепций.Смотрите это:

enter image description here

Я понимаю, что каждая работа будет иметь две очереди (вход / выход) и отношения с бизнес-моделью! Исследователь всегда найдет наиболее актуальное и непротиворечивое состояние предмета .

Итак, отвечая на ваш вопрос:

  • Я могу найти согласованное состояние Item где угодно, используя MemoryRep (в основном Map), состояние обтекания иэлемент в ItemState и управление состоянием изменения задания при постановке в очередь или его исключении из очереди.

  • Производительность сохраняется, за исключением запуска next ()

  • Состояние всегда непротиворечиво (для вашей проблемы)

  • В этой модели возможно использование любого типа очереди, любого количества заданий / очередей и любыхномер штата.

  • Кроме того, это прекрасно !!

0 голосов
/ 31 мая 2018

Имеет ли смысл обернуть очереди логикой для управления состоянием элемента?

public class QueueWrapper<E> implements BlockingQueue<E> {
    private Queue<E> myQueue = new LinkedBlockingQueue<>();
    private Map<E, Status> statusMap;

    public QueueWrapper(Map<E, Status> statusMap) {
        this.statusMap = statusMap;
    }

    [...]
    @Override
    public E take() throws InterruptedException {
        E result = myQueue.take();
        statusMap.put(result, Status.AFTER_FIRST);
        return result;
    }

Таким образом, управление состоянием всегда связано с операциями очереди (и содержится в них) ...

Очевидно, что statusMap нужно синхронизировать, но это все равно будет проблемой.

...