Scala spark: NotSerializableException с гуавой RateLimiter - PullRequest
0 голосов
/ 07 ноября 2018

Я хотел ввести ограничитель скорости для модуля записи искры, чтобы ограничить число запросов http, выполняемых нижестоящему приложению и выполняющих ошибки сериализации искры.

Пример кода:

import org.spark_project.guava.util.concurrent.RateLimiter

@transient
object Baz {
    @transient var maybeRateLimiter: Option[RateLimiter] = createRateLimiter()
    final val DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS = 1000

    def rateLimitedFetch(someKey: String,
                         fooClient: FooClient)(implicit executionContext: ExecutionContext): EitherT[Future, String, Foo] = {
        maybeRateLimiter.fold {
          logger.info("No rate limiter, not gating requests")
          EitherT(
            fooClient.fetchFoo(someKey)
              .wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
          )
        }
        {
          rateLimiter =>
            while (!rateLimiter.tryAcquire(DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) {
              logger.info(s"Not enough permits, requested: 1, current: {}", rateLimiter.getRate)
            }

            EitherT(
              fooClient.fetchFoo(someKey)
                .wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
            )
        }
    }
}

Baz.rateLimitedFetch(someKey, fooClient)

Трассировка стека:

    Caused by: java.io.NotSerializableException: org.spark_project.guava.util.concurrent.RateLimiter$Bursty
Serialization stack:
    - object not serializable (class: org.spark_project.guava.util.concurrent.RateLimiter$Bursty, value: RateLimiter[stableRate=500.0qps])

Не уверен, можно ли использовать guava RateLimiter в этом контексте и есть ли лучший способ ограничения скорости нисходящих запросов от приложения spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...