Хороший метод для объединения записей в очереди - PullRequest
3 голосов
/ 13 апреля 2009

У меня есть пул потоков, которые подают задачи из очереди. Как правило, небольшое количество потоков может поддерживать очередь пустой. Иногда, особенно большой пакет событий будет держать размер очереди выше нуля в течение некоторого времени, но ненадолго.

Мое беспокойство касается событий, которые являются дубликатами или несут данные, которые устаревают предыдущие события. В периоды большого объема такие события могут сосуществовать в очереди в течение короткого периода времени. Я хотел бы иметь возможность сопоставить их, чтобы сократить время, затрачиваемое на бесполезную работу.

Какой хороший способ сопоставить такую ​​очередь? Я мог бы сопоставить во время вставки, перебирая с ног на голову и ища кандидата для замены, однако это кажется слишком грубой силой. Если у вас есть рекомендации по коду или библиотеке, имейте в виду, что я использую Java.

Ответы [ 3 ]

5 голосов
/ 13 апреля 2009

Почему бы просто не реализовать hashCode () и equals () в зависимости от ваших задач. Затем просто удалите задачу. Например.

queue.remove(task);
queue.offer(task);

Тогда у вас не будет дубликатов. Или альтернативно.

if(!queue.contains(task)) {
   queue.offer(task);
}

Это позволит избежать постановки задачи в очередь, если она уже находится в очереди.

2 голосов
/ 14 октября 2016

Кажется, этот конфлятор делает то, что вы ищете: https://github.com/GuillaumeArnaud/conflator

В зависимости от ваших требований, реализация может быть изменена, чтобы объединить ИЛИ заменить последнее событие существующим событием, если оно существует в очереди соединения.

Например. для следующего каждое событие реализовано как «Тик», который определяет поведение слияния.

public class Tick implements Message<Tick> {

    private final String ticker;

    public long getInitialQuantity() {
        return initialQuantity;
    }

    private final long initialQuantity;

    public long getCurrentQuantity() {
        return currentQuantity;
    }

    private long currentQuantity;
    private int numberOfMerges;

    public String getTicker() {
        return ticker;
    }

    public Tick(String ticker, long quantity) {
        this.ticker = ticker;
        this.initialQuantity = quantity;
        this.currentQuantity = quantity;
    }

    @Override
    public String key() {
        return this.ticker;
    }

    @Override
    public String body() {
        return String.valueOf(currentQuantity);
    }

    @Override
    public boolean isMerged() {
        return this.initialQuantity != this.currentQuantity;
    }

    @Override
    public int mergesCount() {
        return numberOfMerges;
    }

    @Override
    public boolean isValid() {
        return false;
    }

    @Override
    public boolean merge(Tick message) {
        if (this.equals(message)) {
            this.currentQuantity += message.currentQuantity;
            numberOfMerges++;
            return true;
        }
        return false;
    }

    @Override
    public int hashCode() {
        return ticker.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if (obj != null && obj instanceof Tick) {
            Tick other = (Tick) obj;
            return this.ticker.equals(other.getTicker());
        }
        return false;
    }

Контрольный пример:

public class TickMergeTest {
    MultiValuedMapConflator conflator;

    @Test
    public void two_unmergeable_ticks_should_be_remain_unmergeable() {
        Tick tick1 = new Tick("GOOG", 100L);
        Tick tick2 = new Tick("AAPL", 120L);

        List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));

        assertNotNull(messages);
        assertEquals(messages.size(), 2);
        assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getCurrentQuantity());
        assertEquals(Long.valueOf(messages.get(1).body()).longValue(), tick2.getCurrentQuantity());
    }

    @Test(timeout = 1000)
    public void two_mergeable_ticks_should_be_merged() {
        Tick tick1 = new Tick("GOOG", 100L);
        Tick tick2 = new Tick("GOOG", 120L);

        List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));

        assertNotNull(messages);
        assertEquals(messages.size(), 1);
        assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getInitialQuantity() + tick2.getInitialQuantity());
    }

    @Test(timeout = 1000)
    public void should_merge_messages_on_same_key() throws InterruptedException {
        // given
        conflator.put(new Tick("GOOG", 100L));
        conflator.put(new Tick("GOOG", 120L));

        // test
        Thread.sleep(300); // waiting the conflation
        Message message = conflator.take();

        // check
        assertNotNull(message);
        assertEquals(Long.valueOf(message.body()).longValue(), 220L);
        assertTrue(message.isMerged());
    }

    @Test(timeout = 1000)
    public void should_not_merge_messages_on_diff_key() throws InterruptedException {
        // given
        conflator.put(new Tick("GOOG", 100L));
        conflator.put(new Tick("AAPL", 120L));

        // test
        Thread.sleep(300); // waiting the conflation
        Message message1 = conflator.take();
        Message message2 = conflator.take();

        // check
        assertNotNull(message1);
        assertNotNull(message2);

        assertEquals(Long.valueOf(message1.body()).longValue(), 100L);
        assertFalse(message1.isMerged());

        assertEquals(Long.valueOf(message2.body()).longValue(), 120L);
        assertFalse(message2.isMerged());

    }
    @Before
    public void setUp() {
        conflator = new MultiValuedMapConflator<Tick>(true);
    }

    @After
    public void tearDown() {
        conflator.stop();
    }
}
2 голосов
/ 14 апреля 2009

Если вы используете LinkedHashMap, вы можете сохранить порядок, в котором записи были добавлены в очередь.

Когда приходит соответствующая запись, я понимаю, что вы хотите добавить некоторые из ее данных в исходную запись очереди. В этом случае вы можете либо обновить хешированный объект на месте, либо использовать HashMap.put(key, value), чтобы заменить помещенный в очередь элемент новым объектом. (Я думаю , что это сохраняет порядок оригинального предмета, но я не проверял это.)

Обратите внимание, что ваш код должен будет явно синхронизировать доступ на чтение и запись к LinkedHashMap и данным внутри него. Вы не хотите обновлять элемент в очереди в то время, когда другой поток захватывает его для обработки. Вероятно, самый простой способ синхронизации - доступ с LinkedHashMap по Collections.synchronizedMap().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...