Исключение тайм-аута Akka, но сообщения действительно отправлены - PullRequest
1 голос
/ 16 июня 2020

Я работаю со стеком Scala 2.13 со следующими технологиями:

  • play! framework 2.8
  • набрано akka 2.6.3
  • alpakka kafka 2.0.3

Задание Akka-stream считывает событие из Kafka, просит актера что-то вычислить, и на основе данного ответа генерирует новые события обратно в Kafka.

Проблема в том, что сообщения, отправленные с использованием шаблона запроса, кажутся потребляемыми QuestionActor (ниже) только когда не менее двух сообщения собираются его почтовым ящиком, и только одно на каждое полученное сообщение .

Странное поведение:

t1

ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)

t2

ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)

t3

ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)

Тогда я пытаюсь понять почему я наблюдаю такое поведение и что делаю неправильно .

akka-stream Kafka конвейер:

Consumer
  .plainSource(consumerSettings, subscription)
  .map(DeserializeEvents.fromService)
  .filter(_.eventType == classOf[Item].getName)
  .via(askFlowExplicit)
  .withAttributes(ActorAttributes.supervisionStrategy(decider()))
  .map(
    response =>
      new ProducerRecord[String, OutputItem](
        topics,
        OutputItem(response.getClass.getName, response)
      )
  )
  .log("Kafka Pipeline")
  .runWith(Producer.plainSink(producerSettings))

Решение - это стратегия контроля, которая возобновляет работу в исключительных случаях Serialisation и Timeout; askFlowExplicit объявляет запрос на запрос к внешнему субъекту и - настоящим - я столкнулся с моей проблемой.

val askFlowExplicit =
  ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
    case (envelope, replyTo) =>
      val item = Serdes.deserialize[Item](envelope.payload)
      Question(item.trID, item.id, item.user, replyTo)
  }

Конвейер запускается в Play! application bootstrap

@Singleton
class ApplicationStart @Inject()(
    configuration: Configuration,
    questionActor: ActorRef[QuestionActor.Question]
) {
  private implicit val logger = Logger.apply(getClass)
  implicit val mat            = context
  AlpakkaPipeline.run(configuration, questionActor)
}

Актер - это простой типизированный актер, принадлежащий той же системе акторов, и - прямо сейчас - он только пересылает запрос, поступающий из потока, в другую службу.

class QuestionActor(
    configuration: Configuration,
    context: ActorContext[Question],
    itemService: ItemService
) extends AbstractBehavior[Question](context) {
  import QuestionActor._

  implicit val ec: ExecutionContextExecutor = context.executionContext
  private implicit val timeout: Timeout = ...

  override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
    case (context, Question(trID, id, user, sender)) =>
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

object QuestionActor {
  final case class Question(
      trID: String,
      id: Int,
      user: Option[UUID],
      replyTo: ActorRef[Answer]
  )

  def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
    Behaviors.setup { context =>
      context.setLoggerName(classOf[QuestionActor])
      implicit val log: Logger = context.log
      new QuestionActor(configuration, context)
    }
}

Он построен с использованием времени выполнения DI и Play!

class BootstrapModule(environment: Environment, configuration: Configuration)
    extends AbstractModule
    with AkkaGuiceSupport {

  override def configure(): Unit = {
    bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
      .toProvider(classOf[QuestionActorProvider])
      .asEagerSingleton()
    bind(classOf[ApplicationStart]).asEagerSingleton()
  }
}

private class Question @Inject()(
    actorSystem: ActorSystem,
    itemService: ItemService,
    configuration: Configuration
) extends Provider[ActorRef[Question]] {
  def get(): ActorRef[Question] = {
    val behavior = QuestionActor(itemService, configuration)
    actorSystem.spawn(behavior, "question-actor")
  }
}

Что я пробовал

  • изменение диспетчера на QuestionActor
  • изменение почтового ящика на QuestionActor
  • запуск конвейера изнутри QuestionActor
  • отправка того же сообщения из конструктора актора (самому себе), наблюдается то же поведение: еще одно сообщение заставит актера использовать первое, запросить таймаут для второго.

Чего я не делал

  • изменение диспетчера на конвейер потока Akka

Мне это кажется проблема с потоком прямо сейчас, но я не знаю, где отсюда go. Любая помощь очень ценится. Заранее спасибо.

1 Ответ

3 голосов
/ 16 июня 2020

Проблема в том, что вы объединяете AbstractBehavior, который предоставляет onMessage, и определяете новое поведение Behaviors.receive[Question]. Вы должны использовать либо то, либо другое.

Удалить Behaviors.receive как показано ниже

  override def onMessage(msg: Question): Behavior[Question] = {
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

AbstractBehavior.onMessage - реализация поведения. Итак, вы получаете сообщение через аргумент метода, вы должны обработать его и вернуть новые Behaviour, Behaviours.same в вашем случае.

Но вместо обработки сообщения вы создаете новый Behaviour с Behaviors.receive и регистрируете обратный вызов Future для исходного первого сообщения. Таким образом, вы видите оператор журнала, когда приходит второе сообщение, которое запускает новое поведение.

Если вы хотите использовать определения стиля FP, вы должны придерживаться только Behaviors.xxx вспомогательных методов. Если вы выбираете стиль OOP, вы расширяете AbstractBehavior. Но вам не следует делать и то, и другое.

...