Я пытаюсь создать поток в Spark, который получает данные от Кафки. Когда я проверяю количество записей в RDD, кажется, что это не то же самое, что веб-интерфейс.
Я выполняю функцию для всех RDD в DStream (коды генерируются в Python):
rdds = KafkaUtils.createStream(...)
rdds = rdds.repartition(1)
rdds.foreachRDD(doJob)
И у меня есть функция doJob al oop и счетчик
def doJob(time, p_rdd):
if not p_rdd.isEmpty:
batch_count = 0
...
...
rdd_collected = p_rdd.collect()
for record in rdd_collected:
...
...
batch_count = batch_count + 1
log("Count: " + str(batch_count))
Я ожидаю, что batch_count должно быть таким же, как http: // webui.adress / my_app_id / Страница потоковой передачи -> Раздел «Завершенная партия» -> Размер ввода. Но, похоже, это не так. Где я должен проверить количество записей RDD в веб-интерфейсе, я что-то упустил?
Спасибо.