Событие отправлено несколько раз на AsyncEventbus - PullRequest
0 голосов
/ 28 декабря 2018

В моей настройке я настроил Guava s (версия 24.1) AsyncEventBus слегка:

public class PausableAsyncEventBus extends AsyncEventBus implements IPausableEventBus{
    private boolean paused = false;
    private LinkedList<Object> queuedEvents = new LinkedList<>();

    public PausableAsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
        super(executor, subscriberExceptionHandler);
    }
    public void pause() {
        paused = true;
    }
    public void resume() {
        paused = false;
        while (!paused && !queuedEvents.isEmpty()) {
            super.post(queuedEvents.removeFirst());
        }
    }

    @Override
    public void post(Object event) {
        if (!paused) {
            super.post(event);
        } else {
            queuedEvents.add(event);
        }
    }
}

Затем у меня есть задача, которая выполняется один раз, которая публикует событие.AsyncEventBus внедряется посредством Spring внедрения зависимостей:

@Autowired
private AsyncEventBus clientServerEventBus;

public void run() {
    log.info("Run CelebrationTask for "+ this, new Exception("Called from"));
    // calculate number of guests
    CelebrationState state = calculateCelebrationState();
    TargetedDialogStateWrapper wrapper = new TargetedDialogStateWrapper(player, state);
    clientServerEventBus.post(wrapper);
    // update reputation
    updateReputation(state);
}

У меня есть аспект AOP, определенный для метода post, поэтому я могу регистрировать, какие события публикуются.Эта запись приводит к следующим строкам:

2018-12-27 23:52:53,783 [pool-2-thread-4] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:03,212 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:08,215 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:13,218 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:18,303 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run

Строка журнала в методе run появляется только один раз, поэтому я знаю, что метод run вызывается только один раз, но событие публикуется несколько раз в нескольких потоках.,Это, в свою очередь, вызывает многократную обработку одного и того же события.

Основная часть моего аспекта выглядит следующим образом:

@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void syncEventBus(){}

@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void asyncEventBus(){}

@Before("syncEventBus() || asyncEventBus()")
public void logEvent(JoinPoint joinPoint) {
    Object event = joinPoint.getArgs()[0];
    if (!ignore(event)) {
        String name = ((EventBus) joinPoint.getTarget()).identifier();
        StringBuilder sb = ...
        log.info(sb.toString());
    }
}

Я просто не вижу, где это идет не так.Это может быть что-то связанное с аспектом или фактом, что Eventbus прокси-сервер Spring, или что-то совершенно иное, я не могу придумать.Любые идеи?

[РЕДАКТИРОВАТЬ:] Обратите внимание, что первые две записи журнала находятся в разных потоках и с интервалом в несколько минут, все последующие находятся в том же потоке, что и второй, и с интервалом в несколько секунд.Однако между этими строками журнала есть и другие зарегистрированные события, которые были вырезаны для простоты.

[EDIT2:]

Я нашел причину.В методе-обработчике, который @Subscribe d для события, у меня есть:

executor.schedule(() -> clientServerEventBus.post(new TargetedEvent((IHumanPlayer) player, wrappedDialogState)), 5, TimeUnit.SECONDS);

, где wrappedDialogState является объектом события.Затем у меня есть другой метод, который подписывается на TargetedEvent:

@Subscribe
public void propagate(TargetedEvent message) {
    clientServerEventBus.post(message.getEvent());
}

Это, конечно, вызовет первый обработчик событий, который отправит новое сообщение через 5 секунд.

Есть лиКак я могу создать архитектурный тест, который проверяет такие циклы событий?

...