В моей настройке я настроил Guava
s (версия 24.1) AsyncEventBus
слегка:
public class PausableAsyncEventBus extends AsyncEventBus implements IPausableEventBus{
private boolean paused = false;
private LinkedList<Object> queuedEvents = new LinkedList<>();
public PausableAsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super(executor, subscriberExceptionHandler);
}
public void pause() {
paused = true;
}
public void resume() {
paused = false;
while (!paused && !queuedEvents.isEmpty()) {
super.post(queuedEvents.removeFirst());
}
}
@Override
public void post(Object event) {
if (!paused) {
super.post(event);
} else {
queuedEvents.add(event);
}
}
}
Затем у меня есть задача, которая выполняется один раз, которая публикует событие.AsyncEventBus
внедряется посредством Spring
внедрения зависимостей:
@Autowired
private AsyncEventBus clientServerEventBus;
public void run() {
log.info("Run CelebrationTask for "+ this, new Exception("Called from"));
// calculate number of guests
CelebrationState state = calculateCelebrationState();
TargetedDialogStateWrapper wrapper = new TargetedDialogStateWrapper(player, state);
clientServerEventBus.post(wrapper);
// update reputation
updateReputation(state);
}
У меня есть аспект AOP, определенный для метода post, поэтому я могу регистрировать, какие события публикуются.Эта запись приводит к следующим строкам:
2018-12-27 23:52:53,783 [pool-2-thread-4] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:03,212 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:08,215 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:13,218 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:18,303 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
Строка журнала в методе run появляется только один раз, поэтому я знаю, что метод run вызывается только один раз, но событие публикуется несколько раз в нескольких потоках.,Это, в свою очередь, вызывает многократную обработку одного и того же события.
Основная часть моего аспекта выглядит следующим образом:
@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void syncEventBus(){}
@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void asyncEventBus(){}
@Before("syncEventBus() || asyncEventBus()")
public void logEvent(JoinPoint joinPoint) {
Object event = joinPoint.getArgs()[0];
if (!ignore(event)) {
String name = ((EventBus) joinPoint.getTarget()).identifier();
StringBuilder sb = ...
log.info(sb.toString());
}
}
Я просто не вижу, где это идет не так.Это может быть что-то связанное с аспектом или фактом, что Eventbus прокси-сервер Spring, или что-то совершенно иное, я не могу придумать.Любые идеи?
[РЕДАКТИРОВАТЬ:] Обратите внимание, что первые две записи журнала находятся в разных потоках и с интервалом в несколько минут, все последующие находятся в том же потоке, что и второй, и с интервалом в несколько секунд.Однако между этими строками журнала есть и другие зарегистрированные события, которые были вырезаны для простоты.
[EDIT2:]
Я нашел причину.В методе-обработчике, который @Subscribe
d для события, у меня есть:
executor.schedule(() -> clientServerEventBus.post(new TargetedEvent((IHumanPlayer) player, wrappedDialogState)), 5, TimeUnit.SECONDS);
, где wrappedDialogState
является объектом события.Затем у меня есть другой метод, который подписывается на TargetedEvent
:
@Subscribe
public void propagate(TargetedEvent message) {
clientServerEventBus.post(message.getEvent());
}
Это, конечно, вызовет первый обработчик событий, который отправит новое сообщение через 5 секунд.
Есть лиКак я могу создать архитектурный тест, который проверяет такие циклы событий?