@ Асьер Аранбарри прав, говоря, что вы не позволите актеру закончить его работу.
Актеры имеют асинхронную природу и, хотя они не реализуют Runnable
, они выполняются отдельно от потока, из которого отправляется сообщение.
Вы отправляете сообщение актеру, а затем сразу утверждаете, что сообщение было изменено.Поскольку субъект работает в асинхронном контексте, то есть в другом потоке, он все еще не обработал входящее сообщение.Таким образом, установка Threed.sleep
позволяет субъекту обрабатывать сообщение, и только после этого выполняется утверждение.
Я могу предложить некоторые изменения в вашем первоначальном дизайне, которые бы хорошо сочетались с природой акка.
Прежде всего, akka не предлагает использовать сообщения с изменчивостью.Они должны быть неизменными.В вашем случае это нарушается методом SomeMessage#setAnotherNum
.Удалите его:
public class SomeMessage {
private int anotherNum;
public SomeMessage(int anotherNum) {
this.anotherNum = anotherNum;
}
public int getAnotherNum() {
return anotherNum;
}
}
После этого создайте новый экземпляр SomeMessage
вместо изменения входящего сообщения в TestActor
и отправьте его обратно на context.sender()
.Как определено здесь
static public class TestActor extends AbstractActor {
private Integer number;
public TestActor(Integer number) {
this.number = number;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
if (message instanceof SomeMessage) {
SomeMessage someMessage = (SomeMessage) message;
System.out.println("someMessage contains = " + someMessage.getAnotherNum());
context().sender().tell(new SomeMessage(number + someMessage.getAnotherNum()), context().self());
}
}).build();
}
}
Теперь вместо изменения внутреннего состояния сообщения создается новое сообщение с новым состоянием, а более позднее сообщение возвращается обратно к sender()
.Это правильное использование Акки.
Это позволяет тесту использовать TestProbe
и быть переопределенным следующим образом:
@Test
public void should_alter_some_message() {
// given
ActorRef testActor = actorSystem.actorOf(Props.create(TestActor.class,10));
TestJavaActor.SomeMessage someMessage = new SomeMessage(5);
TestProbe testProbe = TestProbe.apply(actorSystem);
// when
testActor.tell(someMessage, testProbe.ref());
// then
testProbe.expectMsg(new SomeMessage(15));
}
TestProbe
эмулирует отправителя и захватывает все входящие сообщения / ответы от TestActor
.Обратите внимание, что expectMsg(new SomeMessage(15))
используется вместо утверждения.Он имеет механизм внутренней блокировки, который ожидает получения сообщения, прежде чем будет выполнено утверждение.Это то, что происходит в примере с актерами тестирования .
Чтобы сделать правильное утверждение expectMsg
, вы должны переопределить equals
метод в своем классе SomeMessage
Редактировать:
Почему Акка хмурится при изменении внутреннего состояния SomeMessage?
Одна из возможностей Акки заключается в том, что она не требует синхронизации или ожидания / уведомления для контроля доступа к общему ресурсу.данные.Но это может быть достигнуто только с неизменяемостью сообщения.Представьте, что вы отправляете изменяемое сообщение, которое вы изменяете в то время, когда субъект его обрабатывает.Это может вызвать условия гонки.Прочитайте это для более подробной информации.
И (2) относится ли это к изменению внутреннего состояния актеров?Разве это нормально для ActorRefs иметь свойства, которые можно изменять, или сообщество тоже не одобряет это (и если да, то почему!)?
Нет, здесь это не касается.Если какое-либо состояние инкапсулировано в субъекте и только оно может изменить его , вполне нормально иметь изменчивость.