Как запланировать scala Будущее от персистирующей кварцевой работы - PullRequest
0 голосов
/ 10 февраля 2020

Предположим, есть некоторый код scala, который нужно запланировать с помощью java библиотеки кварца . И нам нужно сохранить результат выполнения этого кода в контексте задания, чтобы иметь доступ к этому результату при следующем выполнении задания. Для примера syntheti c есть CounterService с функцией inc, которая должна быть запланирована:

trait CounterService {
  def inc(): Int
}

Следующее кварцевое задание вызывает inc и сохраняет свой результат в JobDataMap успешно:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: CounterService = ...

  override def execute(context: JobExecutionContext): Unit = {

    val newCounterValue: Int = counterService.inc()

    val map = context.getJobDetail.getJobDataMap
    map.put("counter", newCounterValue)  
  }
}

Мы можем получить результат работы в любое время в другом месте (если у нас есть ссылка на scheduler):

val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job")) 
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala 

Но этот метод не позволяет работать, если мы хотим выполнить asyn c код из кварцевого задания. Например, предположим, что наш счетчик обслуживает следующее:

trait AsyncCounterService {
  def asyncInc(): Future[Int]
}

Мы можем попытаться реализовать нашу работу следующим образом. Но это не работает правильно, потому что метод CounterJob.execute может быть выполнен раньше, чем asyncCounterService.asyncInc. И мы не можем сохранить результат asyncInc в JobDataMap:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: AsyncCounterService = ...  
  val execContext: ExecutionContext = ...

  override def execute(context: JobExecutionContext): Unit = {

    // # 1: we can not influence on the execution flow of this future 
    //      from job scheduler.
    val counterFuture: Future[Int] = counterService.asyncInc() 

    counterFuture.map { counterValue: Int =>

      val map = context.getJobDetail.getJobDataMap  
      // #2: this action won't have any effect
      map.put("counter", counterValue)              
    }
  }
}

Это как минимум две проблемы этого решения, которые отмечены в приведенном выше коде как #1 ... и #2 ... комментарии .

Есть ли лучшие методы для решения этой проблемы? Другими словами, как запланировать scala Future из задания сохраняющегося кварца с сохранением Future's результатов в карте JobDetailData?

1 Ответ

3 голосов
/ 10 февраля 2020

Если все после CounterJob должно иметь значение counterService, тогда можно просто заблокировать и ожидать будущее в CounterJob. В любом случае, ничего не может быть выполнено в это время, потому что значение еще не было вычислено.

import scala.concurrent.{Await,Future}
...

 try {
      val counterValue  = Await.result(counterFuture, 5.seconds)
      map.put("counter", counterValue)       
    } catch {
      case t: TimeoutException => ...
      case t: Exception => ...
   }

Если у вас есть несколько асинхронных c фьючерсов в этом задании, вы можете объединить их либо с монадией c цепочка flatMap, map операций a for comprehension или с помощью вспомогательных методов stati c из Future сопутствующего объекта, например Future.sequence Тогда конечным результатом будет одно будущее, объединяющее все асинхронные c операции, которые вы можете ожидать с помощью Await.

Обычно ждать фьючерсов считается плохой практикой. Поскольку это блокирует поток исполнителя от выполнения любой другой операции в ожидании завершения будущего.

Однако здесь вы смешиваете в другой структуре планирования заданий с другой парадигмой параллелизма. Как указывалось выше, в конкретном примере хорошо блокировать, так как все позже полагается на первое вычисление.

Если бы другие задания могли выполняться одновременно, было бы несколько способов решить эту проблему:

  1. Есть способ вернуть будущее с работы. Затем можно дождаться завершения этого будущего, прежде чем планировать зависимые задания.
  2. Существует какой-то специальный механизм прослушивания событий из задания, который может быть запущен из задания. counterFuture.map {context.notify("computationReady")}
  3. Существует спецификация c AsyncJob, поддерживающая неблокирующую io, которая ожидает java Future в качестве возврата. Затем вы можете преобразовать Scala Future в Java Future
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...