Актер не обрабатывает сообщения в течение достаточно долгого времени - PullRequest
0 голосов
/ 07 марта 2020

В моем приложении akka я использую главного субъекта в качестве контроллера, который получает команды и делегирует их субъекту процессора. Актер обработчика после завершения (что занимает около 2 минут до завершения sh каждой задачи), передает сообщение контроллеру а затем субъект-контроллер отправляет сообщение субъекту базы данных для сохранения. И субъект процессора, и субъект базы данных БД управляются с помощью маршрутизатора, каждый из которых имеет, скажем, 5 маршрутов. Я использую диспетчер по умолчанию, и все остальные настройки akka являются только настройками по умолчанию. Ниже приведен сценарий.

Актер контроллера получает около 100 сообщений, которые передаются субъекту процессора, из журнала видно, что процессор завершил обработку некоторых сообщений (около 5) и передал сообщение о завершении актеру контроллера. Но актер базы данных начинает выполняться через 5 минут. Однако в течение этих 5 минут обработчик обрабатывает ожидающие сообщения. Так что не похоже, что приложение бездействует.

Когда объем сообщения меньше, поток от контроллера -> процессор -> контроллер -> субъект БД почти мгновенно и почти нет задержки.

Мне не нужна эта задержка после обработки, выполнение БД должно произойти, как только обработка будет завершена. Но кажется, что потоки заняты выполнением задачи процессора. Как я могу преодолеть эту ситуацию, в идеале я хочу, чтобы время выполнения моей задачи было меньше, но из-за описанного выше поведения я не могу этого достичь.

1 Ответ

1 голос
/ 07 марта 2020

По умолчанию все актеры Akka используют одного исполнителя, который ограничен использованием максимум 64 потоков. С https://doc.akka.io/docs/akka/current/general/configuration-reference.html:

# This will be used if you have set "executor = "default-executor"".
      # If an ActorSystem is created with a given ExecutionContext, this
      # ExecutionContext will be used as the default executor for all
      # dispatchers in the ActorSystem configured with
      # executor = "default-executor". Note that "default-executor"
      # is the default value for executor, and therefore used if not
      # specified otherwise. If no ExecutionContext is given,
      # the executor configured in "fallback" will be used.
      default-executor {
        fallback = "fork-join-executor"
      }

и fork-join-executor config:

# This will be used if you have set "executor = "fork-join-executor""
      # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 8

        # The parallelism factor is used to determine thread pool size using the
        # following formula: ceil(available processors * factor). Resulting size
        # is then bounded by the parallelism-min and parallelism-max values.
        parallelism-factor = 1.0

        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 64

        # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack
        # like peeking mode which "pop".
        task-peeking-mode = "FIFO"
      }

Проблема может быть связана с блокировкой вызовов в субъектах процессора. Akka назначает отдельные потоки из пула 64 для обработки этих блокирующих вызовов в субъектах процессора и ожидает, пока один из них завершит обработку сообщений, чтобы иметь возможность обрабатывать сообщения для других участников. Это может вызвать временную задержку между актерами.

Ключевой аспект, на котором основывается Akka, заключается в том, что системы должны постоянно реагировать. Если вы использовали тот же пул диспетчера / потока для блокирующих операций БД или обработки сообщений, что и основная инфраструктура маршрутизации Akka, вполне возможно, что все потоки Akka могут быть заняты субъектами обработки или операциями БД, и ваша система будет фактически заблокирована. пока одна из операций блокировки не будет завершена. Это может не быть проблемой для простой системы на одной JVM, которая выполняет только эту задачу, но когда она масштабируется, это может вызвать много проблем.

В таких случаях, как ваша, где вы не можете избежать блокировки, следует использовать выделенный диспетчер для операций блокировки. Эта ссылка говорит об этом аспекте (хотя он называется Akka-Http, его можно обобщить). Вы можете создать два типа диспетчеров для обработки двух разных операций блокировки. Я также считаю, что вам следует регулировать запросы на блокировку, чтобы не перегружать вашу систему (для регулирования используйте диспетчеры). Вы также можете использовать буферы внутри ваших акторов для работы с ситуациями обратного давления.

EDIT

Почтовый ящик контроллера содержит 100 сообщений, и 5 сообщений принимаются и делегируются субъектам процессора. Каждый субъект процессора занимает 2 минуты времени и отправляет ответ обратно контроллеру, а ответ помещается в очередь в почтовом ящике контроллера. Но перед обработкой этих сообщений контроллер должен обработать сообщения, которые были добавлены до того, как эти сообщения эффективно увеличат время обслуживания для обработки сообщений для контроллера. Лаг является кульминацией всего этого процесса. Как только контроллер получил ответное сообщение, он делегировался субъекту. Я думаю, что время обработки сообщений увеличивается с увеличением.

Дайте мне знать, если это поможет !!

...