Несколько вариантов:
План A: Используйте объекты Metric и MetricReporter, чтобы предоставить данные внешней системе (системам).Это имеет недостаток, заключающийся в том, что метрики не являются контрольными точками, и, если batchIds много, вы, вероятно, в конечном итоге будете загрязнять систему метрик множеством метрик, которые не могут получить GC.
ПланированиеB: Переписать вашу RichSinkFunction как RichFlatMap (или ProcessFunction), которая испускает поток удержания кортежей (batchId, number.of.events.in.batchId).Вы можете задать этот поток с помощью batchId, а затем использовать состояние ключа в KeyedProcessFunction (например) для хранения и предоставления этого состояния через запрашиваемое состояние.Это имеет тот недостаток, что запрашиваемое состояние допускает только точечные запросы (по одному ключу за раз).
План C: в этом варианте внешние системы могут запрашивать состояние, созданное в Плане B, путем внедрения запросов в потокэто передается в KeyedBroadcastProcessFunction, которая содержит ключевые данные state.count.number.of.events.processed.for.event.batchId.Затем вы можете использовать ctx.applyToKeyedState в методе processBroadcastElement функции KeyedBroadcastProcessFunction для ответа на эти запросы.См., Например, одно из обучающих упражнений Flink .
План D: записать результаты из B (или C) в redis или эластичный поиск или в другое хранилище данных с запросами и иметьвнешние системы получают эту информацию оттуда.