Я хотел ввести ограничитель скорости для модуля записи искры, чтобы ограничить число запросов http, выполняемых нижестоящему приложению и выполняющих ошибки сериализации искры.
Пример кода:
import org.spark_project.guava.util.concurrent.RateLimiter
@transient
object Baz {
@transient var maybeRateLimiter: Option[RateLimiter] = createRateLimiter()
final val DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS = 1000
def rateLimitedFetch(someKey: String,
fooClient: FooClient)(implicit executionContext: ExecutionContext): EitherT[Future, String, Foo] = {
maybeRateLimiter.fold {
logger.info("No rate limiter, not gating requests")
EitherT(
fooClient.fetchFoo(someKey)
.wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
)
}
{
rateLimiter =>
while (!rateLimiter.tryAcquire(DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) {
logger.info(s"Not enough permits, requested: 1, current: {}", rateLimiter.getRate)
}
EitherT(
fooClient.fetchFoo(someKey)
.wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
)
}
}
}
Baz.rateLimitedFetch(someKey, fooClient)
Трассировка стека:
Caused by: java.io.NotSerializableException: org.spark_project.guava.util.concurrent.RateLimiter$Bursty
Serialization stack:
- object not serializable (class: org.spark_project.guava.util.concurrent.RateLimiter$Bursty, value: RateLimiter[stableRate=500.0qps])
Не уверен, можно ли использовать guava RateLimiter в этом контексте и есть ли лучший способ ограничения скорости нисходящих запросов от приложения spark