Внедрение ПИД-регулятора скорости в Spark Streaming - PullRequest
0 голосов
/ 05 декабря 2018

Я читаю исходный код Spark Streaming в части Кафки.Существует алгоритм управления скоростью, который является PID.И я посетил некоторые сети, которые получили некоторые базовые идеи о PID.Но я все еще не понимаю расчет новой ставки.

Почему новая ставка - это последняя ставка за вычетом комбинаций P, I, D?Я думаю, что выходной результат алгоритма управления PID - это просто комбинация P, I, D?

. Комбинации PID:

combinations_pid

Но новая скорость Spark Streaming в части Kafka составляет :

// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000

// in elements/second
val processingRate = numElements.toDouble / processingDelay * 1000

// In our system `error` is the difference between the desired rate and the measured rate
// based on the latest batch information. We consider the desired rate to be latest rate,
// which is what this estimator calculated for the previous batch.
// in elements/second
val error = latestRate - processingRate

// The error integral, based on schedulingDelay as an indicator for accumulated errors.
// A scheduling delay s corresponds to s * processingRate overflowing elements. Those
// are elements that couldn't be processed in previous batches, leading to this delay.
// In the following, we assume the processingRate didn't change too much.
// From the number of overflowing elements we can calculate the rate at which they would be
// processed by dividing it by the batch interval. This rate is our "historical" error,
// or integral part, since if we subtracted this rate from the previous "calculated rate",
// there wouldn't have been any overflowing elements, and the scheduling delay would have
// been zero.
// (in elements/second)
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis

// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate

val newRate = (latestRate - proportional * error -
                              integral * historicalError -
                              derivative * dError).max(minRate)

Кто-нибудь знает?

...