использование потоков против актеров для периодических c задач - PullRequest
1 голос
/ 01 апреля 2020

Я работаю со стеком akka / scala / play.

Обычно я использую поток для выполнения определенных задач. Например, у меня есть поток, который просыпается каждую минуту, берет что-то из БД и вызывает другой сервис для обогащения своих данных с помощью API и сохранения обогащения в БД.

что-то вроде этого:

class FetcherAndSaveStream @Inject()(fetcherAndSaveGraph: FetcherAndSaveGraph, dbElementsSource: DbElementsSource)
                                     (implicit val mat: Materializer,
                                      implicit val exec: ExecutionContext) extends LazyLogging {

  def graph[M1, M2](source: Source[BDElement, M1],
                    sink: Sink[BDElement, M2],
                    switch: SharedKillSwitch): RunnableGraph[(M1, M2)] = {

    val fetchAndSaveDataFromExternalService: Flow[BDElement, BDElement, NotUsed] =
      fetcherAndSaveGraph.fetchEndSaveEnrichment

    source.viaMat(switch.flow)(Keep.left)
      .via(fetchAndSaveDataFromExternalService)
      .toMat(sink)(Keep.both).withAttributes(supervisionStrategy(resumingDecider))

  }


  def runGraph(switchSharedKill: SharedKillSwitch): (NotUsed, Future[Done]) = {
    logger.info("FetcherAndSaveStream is now running")
    graph(dbElementsSource.dbElements(), Sink.ignore, switchSharedKill).run()
  }
}

Интересно, это лучше, чем просто использовать актера, который тикает каждую минуту и ​​делает что-то подобное? Каково сравнение между использованием актеров для этого и потока?

пытается все еще выяснить, когда я должен выбрать какой метод (потоки / актеры). спасибо !!

1 Ответ

0 голосов
/ 01 апреля 2020

Вы можете использовать оба, в зависимости от требований, которые у вас есть для вашего решения, которые там не перечислены. Общее беспокойство, которое вы должны принять во внимание - актеры более низкоуровневые вещи, чем потоки, поэтому они требуют больше кода и отладки.

По сути, потоки хороши для задач, где у вас есть относительно большой объем данных, которые необходимо обработать с низким потреблением памяти. С потоками вам не нужно начинать потоковую передачу каждые n секунды, вы можете настроить этот поток для запуска вместе с приложением. Это может сделать ваш код более лаконичным, пропустив планировщик logi c. Я опущу ваш DI и архитектуру, напишу решение с псевдокодом:

val yourConsumer: Sink[YourDBRecord] = ???
val recordsSource: Source[YourDBRecord] = 
val runnableGraph = (Source repeat ())
  .throttle(1, n seconds)
  .mapAsync(yourParallelism){_ =>
    fetchReasonableAmountOfRecordsFromDB
  } mapConcat identity to yourConsumer

Этот поток сделает ваши вещи. Вы даже можете улучшить его с помощью более сложной логики c, чтобы адаптировать частоту опроса в соответствии с рабочими нагрузками, используя обратную связь l oop в графе api. Кроме того, вы можете добавить стратегию обработки ошибок, которую необходимо возобновить, в случае сбоя потока.

Более того, есть соединители alpakka для DBS, которые способны на это, вы можете увидеть, соответствуют ли решения там, где вы хотите, или проверить детали реализации.

Что вы можете получить, сделав это - противодавление, способность работать с потоками, чистым и лаконичным кодом без синхронизированных автоматов, управляемых непосредственно вами. https://doc.akka.io/docs/akka/current/stream/stream-rate.html

Вы также можете создать актера, но тогда вы должны делать все то, что потоки akka делают для вас вручную, то есть обратное давление в случае, если вы хотите взаимодействовать с потоками , планировщик, чанкинг и управление памятью (чтобы не загружать 100000 или около того записей в одном пакете в память), et c.

...