Вы действительно должны использовать RichParallelSourceFunction .Если вы хотите, чтобы записи из разных экземпляров источника отличались друг от друга, вы можете получить индекс каждого экземпляра из RuntimeContext , который доступен через метод getRuntimeContext () в интерфейсе RichFunction.
Кроме того, у Flink есть встроенный репортер statsd metrics , который вы должны использовать вместо того, чтобы использовать свой собственный.Более того, numRecordsIn, numRecordsOut, numRecordsInPerSecond и numRecordsOutPerSecond уже вычисляются для вас , поэтому нет необходимости создавать эту инструментарий самостоятельно.Вы также можете получить доступ к этим метрикам через веб-интерфейс Flink или REST API.
Что касается причин, по которым вы, возможно, испытываете плохую масштабируемость с потребителем Kafka, это может быть вызвано многими причинами.Если вы используете обработку времени события, то незанятые разделы могут удерживать вещи (см. https://issues.apache.org/jira/browse/FLINK-5479). Если поток имеет ключ, то может возникнуть проблема с перекосом данных. Если вы подключаетесь к внешней базе данных или службе, тогдаэто может быть узким местом. Если контрольная точка настроена неправильно, это может вызвать это. Или недостаточную пропускную способность сети.
Я бы начал отлаживать это, посмотрев на некоторые ключевые метрики в веб-интерфейсе Flink. Хорошо ли сбалансирована нагрузка?через подзадачи, или это перекос? Вы могли бы включить отслеживание задержки и посмотреть, не ведет ли себя один из разделов kafka (проверяя задержку в приемнике (ах), которая будет сообщаться для каждого раздела)И вы могли бы искать обратное давление.