AskTimeoutException при модульном тестировании Embedded Kafka Akka Java - PullRequest
0 голосов
/ 07 февраля 2020

Я работаю над akka с Kafka и пишу контрольные примеры для моего Kafka Consumer. Я использовал Embedded Kafka для модульного тестирования.

Когда я пытаюсь запустить свой тестовый сценарий, все идет хорошо, но в последующем произошла исключительная ситуация:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://testActor/system/StreamSupervisor-0/flow-0-1-mapAsyncUnordered#-2130769]] after [1000 ms]. Message of type [akka.stream.impl.fusing.ActorGraphInterpreter$Snapshot$] was sent by [Actor[akka://testActor/system/StreamSupervisor-0#-867168141]]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:675)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:696)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:202)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:334)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:285)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:289)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:241)
at java.lang.Thread.run(Thread.java:748)

Вот мой код.

Мой метод тестирования:

@Test
public void publishMessage() {

    final TestKit probe = new TestKit(system);
    final Config config = system.settings().config().getConfig("akka.kafka.producer");
    ActorRef childMaker = probe.getTestActor();

    final ProducerSettings<String, String> producerSettings =
            ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
                    .withBootstrapServers(bootstrapServers);

    Source.range(1, 10)
            .map(Object::toString)
            .map(value -> new ProducerRecord<>(topic, 0, "key1", value))
            .runWith(Producer.plainSink(producerSettings), materializer);

    new EventFilter(Logging.Info.class, system)
            .occurrences(1)
            .matches("Starting up Consumer:")
            .matches("Consumer Started:")
            .intercept(() -> TestActorRef
                    .create(system, KafkaConsumerPlainExternalSource.props(new RequestRegisterConsumer(system,
                            config, bootstrapServers, groupId, topic, (byte) 0, childMaker))));

}

Мой класс KafkaConsumer выглядит следующим образом:

public class KafkaConsumerPlainExternalSource extends AbstractLoggingActor {

private static RequestRegisterConsumer consumerConf;

static Props props(RequestRegisterConsumer consumerConf) {
    return Props.create(KafkaConsumerPlainExternalSource.class, consumerConf);
}

public KafkaConsumerPlainExternalSource(RequestRegisterConsumer consumerConf) {
    KafkaConsumerPlainExternalSource.consumerConf = consumerConf;
}

@Override
public Receive createReceive() {
    return receiveBuilder().build();
}

@Override
public void preStart() {

    log().info("Starting up Consumer: " + self().path().toString());

    //Update
akka.kafka.javadsl.Consumer.plainExternalSource(consumer, Subscriptions
                .assignment(new TopicPartition(consumerConf.getTopic(), consumerConf.getTopicPartition())))
                .mapAsync(10, Consumer :: consume)
                .to(Sink.ignore())
                .run(ActorMaterializer.create(consumerConf.getActorSystem()));
    log().info("Consumer Started: " + self().path().toString());
}

}

Мой файл application.conf:

>    akka { 
>   loggers = [akka.testkit.TestEventListener] 
>     test {
>     timefactor = 1.0
>     filter-leeway = 10s
>     calling-thread-dispatcher {
>       type = akka.testkit.CallingThreadDispatcherConfigurator
>     }   }   kafka.producer {//producer conf} }

Когда я добавляю 10 секунд сна c в последнем моем тестовом случае, мой тест проходит нормально. Я не смог найти root причину этого исключения.

...