Kotlin для достижения многопоточного хеджирования запросов? - PullRequest
2 голосов
/ 06 ноября 2019

Весенний реактор имеет интересную особенность: Хеджирование . Это означает создание множества запросов и получение первого возвращенного результата, а также автоматическую очистку других контекстов. Джош Лонг в последнее время активно продвигает эту функцию. Погуглив Хеджирование в весеннем реакторе показывает относительные результаты. Если кому-то интересно, здесь - это пример кода. Короче говоря, Flux.first() упрощает все возникающие проблемы, что очень впечатляет.

Интересно, как этого можно достичь с помощью сопрограмм Котлина и многопоточности (и, возможно, с Flow или Channel). Я подумал о простом сценарии: один сервис принимает longUrl и порождает longUrl для многих сервисов сокращения URL (таких как IsGd, TinyUrl ...) и возвращает первый возвращенный URL ... (и завершает / очищает другие ресурсы потока / сопрограммы)

Есть интерфейс UrlShorter, который определяет эту работу:

interface UrlShorter {
  fun getShortUrl(longUrl: String): String?
}

И есть три реализации, одна для is.gd , другая для tinyUrl , а третья - это реализация Dumb, которая блокирует 10 секунд и возвращает ноль:

class IsgdImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    // isGd api url blocked by SO , it sucks . see the underlaying gist for full code
    val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}", it)
    }
  }
}

class TinyImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by stackoverflow , see the underlaying gist for full code
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}", it)
    }
  }
}

class DumbImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    TimeUnit.SECONDS.sleep(10)
    return null
  }
}

И есть UrlShorterService, который принимает все реализации UrlShorter и пытается порождатьсопрограммы и получить первый результат.

Вот то, о чем я подумал:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {


  private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
  private val esDispatcher = es.asCoroutineDispatcher()

  suspend fun getShortUrl(longUrl: String): String {
    return method1(longUrl) // there are other methods , with different ways...
  }

  private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
    for (element in this) {
      val result = transform(element)
      if (result != null) return result
    }
    return null
  }

Клиентская сторона тоже проста:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {

  @Test
  fun testHedging() {
    val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) // Dumb first
    val service = UrlShorterService(impls)
    runBlocking {
      service.getShortUrl("https://www.google.com").also {
        logger.info("result = {}", it)
      }
    }
  }
}

Обратите внимание, что сначала я поставил DumbImpl, потому чтоЯ надеюсь, что это может сначала появиться и блокировать в своей нити. И другие две реализации могут получить результат.

Хорошо, вот проблема, как добиться хеджирования в kotlin? Я пробую следующие методы:

  private suspend fun method1(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }.flowOn(esDispatcher)
    }.first()
      .also { esDispatcher.cancelChildren() } // doesn't impact the result
  }

Я надеюсь, что method1 должен работать, но он полностью выполняет 10 секунд:

00:56:09,253 INFO  TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO  DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO  IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
00:56:13,604 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
00:56:19,261 INFO  UrlShorterServiceTest$testHedging$1 - result = // tiny url blocked by SO , it sucks

Затем, я думал, что другой method2, method3, method4,method5 ... но все не работает:

  /**
   * 00:54:29,035 INFO  IsgdImpl - running : pool-1-thread-3
   * 00:54:29,036 INFO  DumbImpl - running : pool-1-thread-2
   * 00:54:29,035 INFO  TinyImpl - running : pool-1-thread-1
   * 00:54:30,228 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 00:54:30,797 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 00:54:39,046 INFO  UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO , it sucks
   */
  private suspend fun method2(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 00:52:30,681 INFO  IsgdImpl - running : pool-1-thread-2
   * 00:52:30,682 INFO  DumbImpl - running : pool-1-thread-1
   * 00:52:30,681 INFO  TinyImpl - running : pool-1-thread-3
   * 00:52:31,838 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 00:52:33,721 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 00:52:40,691 INFO  UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO , it sucks
   */
  private suspend fun method3(longUrl: String): String {
    return coroutineScope {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 01:58:56,930 INFO  TinyImpl - running : pool-1-thread-1
   * 01:58:56,933 INFO  DumbImpl - running : pool-1-thread-2
   * 01:58:56,930 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:58:58,411 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 01:58:59,026 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 01:59:06,942 INFO  UrlShorterServiceTest$testHedging$1 - result =  // idGd url blocked by SO , it sucks
   */
  private suspend fun method4(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }

Я не знаком с Channel, извините за исключение ↓

  /**
   * 01:29:44,460 INFO  UrlShorterService$method5$2 - channel closed
   * 01:29:44,461 INFO  DumbImpl - running : pool-1-thread-2
   * 01:29:44,460 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:29:44,466 INFO  TinyImpl - running : pool-1-thread-1
   * 01:29:45,765 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 01:29:46,339 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   *
   * kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
   *
   */
  private suspend fun method5(longUrl: String): String {
    val channel = Channel<String>()

    withContext(esDispatcher) {
      impls.forEach { impl ->
        launch {
          impl.getShortUrl(longUrl)?.also {
            channel.send(it)
          }
        }
      }
      channel.close()
      logger.info("channel closed")
    }

    return channel.consumeAsFlow().first()
  }

ОК, яне знаю, есть ли другие способы ... но все вышеперечисленное не работает ... Все блоки не менее 10 секунд (заблокировано DumbImpl).

Весь исходный код можно найти на github gist .

Как можно добиться хеджирования в котлине? По Deferred или Flow или Channel или любым другим лучшим идеям? Спасибо.

После отправки вопроса я обнаружил, что все tinyurl, isGd url заблокированы SO. Это действительно отстой!

Ответы [ 2 ]

1 голос
/ 07 ноября 2019

Если реальная работа, которую вы хотите выполнять параллельно, состоит из сетевых извлечений, вам следует выбрать асинхронную сетевую библиотеку, чтобы вы могли правильно использовать с ней неблокирующие сопрограммы. Например, начиная с версии 11 JDK предоставляет асинхронный HTTP-клиент, который вы можете использовать следующим образом:

val httpClient: HttpClient = HttpClient.newHttpClient()

suspend fun httpGet(url: String): String = httpClient
        .sendAsync(
                HttpRequest.newBuilder().uri(URI.create(url)).build(),
                BodyHandlers.ofString())
        .await()
        .body()

Вот функция, которая выполняет хеджирование запросов с учетом приостановленной реализации, как указано выше:

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String? = impls
            .asFlow()
            .flatMapMerge(impls.size) { impl ->
                flow<String?> {
                    try {
                        impl.getShortUrl(longUrl)?.also { emit(it) }
                    }
                    catch (e: Exception) { 
                        // maybe log it, but don't let it propagate
                    }
                }
            }
            .onCompletion { emit(null) }
            .first()
}

Обратите внимание на отсутствие каких-либо пользовательских диспетчеров, они вам не нужны для приостановленной работы. Это сделает любой диспетчер, и вся работа может выполняться в одном потоке.

Детали onCompletion вступают в действие, когда все ваши сокращатели URL выходят из строя. В этом случае стадия flatMapMerge ничего не излучает, и first() зашел бы в тупик без добавления null в поток.

Для проверки я использовал следующий код:

class Shortener(
        private val delay: Long
) : UrlShortener {
    override suspend fun getShortUrl(longUrl: String): String? {
        delay(delay * 1000)
        println("Shortener $delay completing")
        if (delay == 1L) {
            throw Exception("failed service")
        }
        if (delay == 2L) {
            return null
        }
        return "shortened after $delay seconds"
    }
}

suspend fun main() {
    val shorteners = listOf(
            Shortener(4),
            Shortener(3),
            Shortener(2),
            Shortener(1)
    )
    measureTimeMillis {
        UrlShortenerService(shorteners).getShortUrl("bla").also {
            println(it)
        }
    }.also {
        println("Took $it ms")
    }
}

Здесь используются различные случаи сбоя, такие как возврат нуля или сбой с исключением. Для этого кода я получаю следующий вывод:

Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms

Мы видим, что укорочители 1 и 2 завершены, но с ошибкой, укорочитель 3 вернул правильный ответ, а укорочитель 4 был отменен перед завершением. Я думаю, что это соответствует требованиям.


Если вы не можете отойти от блокировки запросов, ваша реализация должна запустить num_impls * num_concurrent_requests потоков, что не очень хорошо. Однако, если это лучшее, что вы можете иметь, вот реализация, которая хеджирует блокирующие запросы, но ожидает их приостановки и отмены. Он отправит сигнал прерывания рабочим потокам, выполняющим запросы, но если код ввода-вывода вашей библиотеки не прерывается, эти потоки будут зависать в ожидании завершения своих запросов или истечения времени ожидания.

val es = Executors.newCachedThreadPool()

interface UrlShortener {
    fun getShortUrl(longUrl: String): String? // not suspendable!
}

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String {
        val chan = Channel<String?>()
        val futures = impls.map { impl -> es.submit {
            try {
                impl.getShortUrl(longUrl)
            } catch (e: Exception) {
                null
            }.also { runBlocking { chan.send(it) } }
        } }
        try {
            (1..impls.size).forEach { _ ->
                chan.receive()?.also { return it }
            }
            throw Exception("All services failed")
        } finally {
            chan.close()
            futures.forEach { it.cancel(true) }
        }
    }
}
0 голосов
/ 06 ноября 2019

По сути, это то, для чего был разработан select APi:

coroutineScope {
    select {
        impls.forEach { impl ->
            async {
               impl.getShortUrl(longUrl)
            }.onAwait { it }
        }
    }
    coroutineContext[Job].cancelChildren() // Cancel any requests that are still going.
}

Обратите внимание, что это не будет обрабатывать исключения, выдаваемые реализациями службы, вам нужно будет использовать supervisorScope спользовательский обработчик исключений и фильтрующий цикл выбора, если вы хотите обработать их.

...