Лучший способ обработки асинхронных вызовов при использовании AWS Java SDK V2 в Scala? - PullRequest
0 голосов
/ 27 ноября 2018

Фон

Несколько дней назад была официально выпущена версия 2.1 AWS Java SDK.Одним из основных преимуществ является то, как он обрабатывает асинхронные вызовы по сравнению с предыдущей версией SDK.

Я решил провести несколько экспериментов с использованием Scala и нового SDK, и мне было немного сложно найти идиоматический способ работы с фьючерсами, возвращаемыми SDK.

Вопрос

Есть ли способ сделать это лучше, более лаконично и с меньшим количеством шаблонного кода?

Цель

Работа с AWS SDKдля Java V2 с использованием Scala и возможность обрабатывать успехи и сбои идиоматическим способом.

Эксперимент

Создание асинхронного клиента SNS и отправка сообщений 500 асинхронно:

Эксперимент1 - Используйте CompletableFuture Возвращенный SDK

  (0 until 500).map { i =>
    val future = client.publish(PublishRequest.builder().topicArn(arn).message(messageJava + i.toString).build())
    future.whenComplete((response, ex) => {
      val responseOption = Option(response) // Response can be null
      responseOption match {
        case Some(r) => println(r.messageId())
        case None => println(s"There was an error ${ex.getMessage}")
      }
    })
  }.foreach(future => future.join())

Здесь я создаю уникальный запрос и публикую его.Функция whenComplete превращает ответ в параметр, так как это значение может быть нулевым.Это уродливо, потому что средства борьбы с успехом / неудачей связаны с проверкой на ноль в ответе.

Эксперимент 2 - Получить результат внутри Scala Future

(0 until 500).map { i =>
    val jf = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build())
    val sf: Future[PublishResponse] = Future { jf.get }
    sf.onComplete {
      case Success(response) => print(response.messageId)
      case Failure(ex) => println(s"There was an error ${ex.getMessage}")
    }
    sf
  }.foreach(Await.result(_, 5000.millis))

Здесь я используюметод .get() в CompletableFuture, таким образом, я могу иметь дело только с Scala Future.

Эксперимент 3 - Используйте библиотеку Scala - Java8 - Compat для преобразования CompletableFuture в Future

(0 until 500).map { i =>
    val f = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build()).toScala
    f.onComplete {
      case Success(response) =>
      case Failure(exception) => println(exception.getMessage)
    }
    f
  }.foreach(Await.result(_, 5000.millis))

Это моя любимая реализация, за исключением того, что мне нужно использовать третью сторону экспериментальная библиотека .

Наблюдения

  • В общем, все эти реализации выполнялись примерно одинаково, с future.join() чуть-чуть быстрее, чем другие.
  • Время, которое потребовалось этим функциям для инициализации клиента и публикации 500 сообщений, составило около 2 секунд
  • Последовательная версия этого кода занимает чуть менее 1 минуты (55 секунд)
  • Вы можете увидеть полный код здесь

1 Ответ

0 голосов
/ 27 ноября 2018

Вы упомянули, что вы довольны преобразованием Completetablefuture в scala.future, просто вы не любите зависеть от scala-java8-compat.

В этом случае вы можете просто бросить свойсобственный, и вы хотите, чтобы java8 только для scala:

object CompletableFutureOps {                                                                                                                                        

  implicit class CompletableFutureToScala[T](cf: CompletableFuture[T]) {                                                                                             
    def asScala: Future[T] = {                                                                                                                                       
      val p = Promise[T]()                                                                                                                                           
      cf.whenCompleteAsync{ (result, ex) =>                                                                                                                          
        if (result == null) p failure ex                                                                                                                             
        else                p success result                                                                                                                         
      }                                                                                                                                                              
      p.future                                                                                                                                                       
    }                                                                                                                                                                
  }                                                                                                                                                                  
}

def showByExample: Unit = {
  import CompletableFutureOps._   
  (0 until 500).map { i =>                                                                                                                                                                                                                                                                                     
     val f = CompletableFuture.supplyAsync(() => i).asScala                                                                                                             
     f.onComplete {                                                                                                                                                     
       case Success(response)  => println("Success: " + response)                                                                                                        
       case Failure(exception) => println(exception.getMessage)                                                                                                         
     }                                                                                                                                                                  
     f                                                                                                                                                                  
  }.foreach(Await.result(_, 5000.millis))    
}             
...