Я работаю над akka с Kafka и пишу контрольные примеры для моего Kafka Consumer. Я использовал Embedded Kafka для модульного тестирования.
Когда я пытаюсь запустить свой тестовый сценарий, все идет хорошо, но в последующем произошла исключительная ситуация:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://testActor/system/StreamSupervisor-0/flow-0-1-mapAsyncUnordered#-2130769]] after [1000 ms]. Message of type [akka.stream.impl.fusing.ActorGraphInterpreter$Snapshot$] was sent by [Actor[akka://testActor/system/StreamSupervisor-0#-867168141]]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:675)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:696)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:202)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:334)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:285)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:289)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:241)
at java.lang.Thread.run(Thread.java:748)
Вот мой код.
Мой метод тестирования:
@Test
public void publishMessage() {
final TestKit probe = new TestKit(system);
final Config config = system.settings().config().getConfig("akka.kafka.producer");
ActorRef childMaker = probe.getTestActor();
final ProducerSettings<String, String> producerSettings =
ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
Source.range(1, 10)
.map(Object::toString)
.map(value -> new ProducerRecord<>(topic, 0, "key1", value))
.runWith(Producer.plainSink(producerSettings), materializer);
new EventFilter(Logging.Info.class, system)
.occurrences(1)
.matches("Starting up Consumer:")
.matches("Consumer Started:")
.intercept(() -> TestActorRef
.create(system, KafkaConsumerPlainExternalSource.props(new RequestRegisterConsumer(system,
config, bootstrapServers, groupId, topic, (byte) 0, childMaker))));
}
Мой класс KafkaConsumer выглядит следующим образом:
public class KafkaConsumerPlainExternalSource extends AbstractLoggingActor {
private static RequestRegisterConsumer consumerConf;
static Props props(RequestRegisterConsumer consumerConf) {
return Props.create(KafkaConsumerPlainExternalSource.class, consumerConf);
}
public KafkaConsumerPlainExternalSource(RequestRegisterConsumer consumerConf) {
KafkaConsumerPlainExternalSource.consumerConf = consumerConf;
}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
@Override
public void preStart() {
log().info("Starting up Consumer: " + self().path().toString());
//Update
akka.kafka.javadsl.Consumer.plainExternalSource(consumer, Subscriptions
.assignment(new TopicPartition(consumerConf.getTopic(), consumerConf.getTopicPartition())))
.mapAsync(10, Consumer :: consume)
.to(Sink.ignore())
.run(ActorMaterializer.create(consumerConf.getActorSystem()));
log().info("Consumer Started: " + self().path().toString());
}
}
Мой файл application.conf:
> akka {
> loggers = [akka.testkit.TestEventListener]
> test {
> timefactor = 1.0
> filter-leeway = 10s
> calling-thread-dispatcher {
> type = akka.testkit.CallingThreadDispatcherConfigurator
> } } kafka.producer {//producer conf} }
Когда я добавляю 10 секунд сна c в последнем моем тестовом случае, мой тест проходит нормально. Я не смог найти root причину этого исключения.