Я читаю исходный код Spark Streaming в части Кафки.Существует алгоритм управления скоростью, который является PID.И я посетил некоторые сети, которые получили некоторые базовые идеи о PID.Но я все еще не понимаю расчет новой ставки.
Почему новая ставка - это последняя ставка за вычетом комбинаций P, I, D?Я думаю, что выходной результат алгоритма управления PID - это просто комбинация P, I, D?
. Комбинации PID:
![combinations_pid](https://wikimedia.org/api/rest_v1/media/math/render/svg/69072d4013ea8f14ab59a8283ef216fb958870b2)
Но новая скорость Spark Streaming в части Kafka составляет :
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000
// in elements/second
val processingRate = numElements.toDouble / processingDelay * 1000
// In our system `error` is the difference between the desired rate and the measured rate
// based on the latest batch information. We consider the desired rate to be latest rate,
// which is what this estimator calculated for the previous batch.
// in elements/second
val error = latestRate - processingRate
// The error integral, based on schedulingDelay as an indicator for accumulated errors.
// A scheduling delay s corresponds to s * processingRate overflowing elements. Those
// are elements that couldn't be processed in previous batches, leading to this delay.
// In the following, we assume the processingRate didn't change too much.
// From the number of overflowing elements we can calculate the rate at which they would be
// processed by dividing it by the batch interval. This rate is our "historical" error,
// or integral part, since if we subtracted this rate from the previous "calculated rate",
// there wouldn't have been any overflowing elements, and the scheduling delay would have
// been zero.
// (in elements/second)
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
Кто-нибудь знает?