Как заблокировать метод другим методом void - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть метод прослушивателя, который обрабатывает сообщения, реализованные в облачном потоке Spring следующим образом:

@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
    // if (concurrentHashMap.containsKey(foo.getId())) concurrentHashMap.remove(foo.getId());
}

вот мой второй метод, который должен быть заблокирован предыдущим:

public Foo getFoo(long fooId) {
    // here I need block method with some mechanism until handleMsg remove received object from map and return this foo from there

    return fooFromStream;
}

Моя цель - вызвать getFoo метод из класса обслуживания следующим образом:

// some logic

Foo foo = service.getFoo(fooId);

// some logic which required received foo;

У меня была идея обернуть метод Foo в getFoo в метод AsyncResult и вызвать следующий метод get в отношении будущего результата того, что вызывает блок, но я понятия не имею, как передать foo из потока в метод getFoo

Вариант использования должен быть таким:

Я вызываю метод getFoo, который отправляет foo в брокер сообщений и регистрирует foo на карте, и выполняет некоторую логику, а затем, когда команда выполнена, я получаю сообщение в StreamListenner, удаляю foo из карты, и затем мне нужно вернуть этот foo из метода getFoo.

Можете ли вы сказать мне, как это сделать или как лучше всего это решить? Спасибо за совет.

Ответы [ 2 ]

0 голосов
/ 13 сентября 2018

Для этого можно использовать параллельную карту длинных и блокирующих очередей блокировки:

ConcurrentMap<Long, BlockingQueue<Foo>> fooMap = new ConcurrentHashMap<>();
...
private BlockingQueue<Foo> getFooQueue(long fooId) {
    return fooMap.computeIfAbsent(fooId, l -> new ArrayBlockingQueue<>(1));
}
...
@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
    BlockingQueue<Foo> fq = getFooQueue(foo.getId());
    synchronized(fq) {
        fq.clear();
        fq.add(foo);
    }
}
...
public Foo getFoo(long fooId) throws InterruptedException {
    BlockingQueue<Foo> fq = getFooQueue(fooId);
    synchronized(fq) {
        return fq.take();
    }
}

Эти 2 synchronized блоки нужны только в том случае, если возможно, что ваш handleMsg может быть вызван несколько раз, когда текущий доступный foo должен быть переопределен с новым foo.

0 голосов
/ 13 сентября 2018

Не совсем понятно, что вы пытаетесь сделать, но Map<Long, BlockingQueue<Foo> позволит вам заблокировать take (или poll с таймаутом, вероятно, лучше), пока слушатель offer s не Foo;затем удалите запись карты.

Имейте в виду, что, как только Foo будет помещен в очередь, запись будет подтверждена и, если произойдет сбой сервера, будет потеряна.

...