Akka ActorSystem обрабатывает одно и то же событие дважды - PullRequest
0 голосов
/ 31 мая 2018

У нас есть настройка Scala Akka ActorSystem, которая обрабатывает миллионы событий в день, исторически мы редко находим 1 или 2 события, которые обрабатывались дважды, но в последнее время количество дублированных событий увеличилось до 100 в некоторые дни.

Наша настройка упрощена следующим образом:

// EventJob runs once every 10 seconds
class EventJob extends Actor {
  val EventListnerPoolOfActors = ActorSystem().actorOf(
    RoundRobinPool(10)
      .props(Props(classOf[EventHandler])),
    "InjectorActorID"
  )

  override def preStart(): Unit = {
    self ! ReceivedJobStart()
  }

  def receive: Actor.Receive = {
    case ReceivedJobStart() =>
      doWork()
      context.system.scheduler.scheduleOnce(10, self, ReceivedJobStart())
  }

  def doWork(): Future[Unit] = {
    // returns Future[Seq[Event]]
    getUnprocessEvents().map { x =>
      {
        // pass each Event to an EventHandler Actor to process
        for (a <- 0 to x.size) {
          EventListnerPoolOfActors ! x(a)
        }
      }
    }
  }
}

class EventHandler extends Actor {
  def receive = {
    ...
  }
}

Каждое событие имеет уникальный идентификатор, в наших журналах показано, что какое-то событие было обработано дважды (переходит к EventHandler.receive) в течение миллисекунд друг от друга.Все действующие лица являются локальными.

AFAIK. Надежность доставки сообщений по умолчанию не более одного раза, что может быть причиной того, что увеличивающееся количество сообщений доставляется более одного раза, и как уменьшить эту проблему?

Наша система настроена на обработку дубликатов, мы просто не знаем, почему она в последнее время увеличивается, и хотели бы уменьшить ее.

1 Ответ

0 голосов
/ 31 мая 2018

Предполагая, что ваша система не производит дублирующихся единиц работы с одним и тем же идентификатором, возможная причина, по которой система иногда обрабатывает сообщение более одного раза, связана с тем, как события распределяются по EventHandler субъектам;это не имеет ничего общего с гарантиями доставки сообщений Akka.

Рассмотрим ваш getUnprocessEvents() метод.Он возвращает Future[Seq[Event]] и выходит за пределы обычной обработки сообщения субъекта, и нет никакой гарантии, что сообщение m было удалено из Seq до повторного вызова getUnprocessEvents().Сообщения передаются рабочим без учета того, доступны ли они для дополнительной работы.Работник все еще может обрабатывать сообщение m во время последующего вызова getUnprocessEvents(), и в этом случае m снова отправляется в почтовый ящик работника.Использование планировщика для периодического вызова этого метода, т. Е. Указание временного окна, чтобы дать работникам-акторам достаточно времени для обработки их сообщений, является некорректным подходом к координации работы.

Лучший подходсостоит в том, чтобы сделать рабочую очередь частью состояния актера координатора работы (т. е. сделать очередь внутренней переменной в акторе и изменить очередь через обмен сообщениями актера) и использовать шаблон рабочего вытягивания .Кроме того, рассмотрите возможность использования Akka Streams .

. В качестве дополнительного примечания EventJob создает новый ActorSystem:

val EventListnerPoolOfActors = ActorSystem().actorOf(...)

Должен быть только один ActorSystem за заявкуИспользуйте context вместо:

val EventListnerPoolOfActors = context.actorOf(...)
...