представьте, у меня есть один поток из структурированного потока.
val sourceDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", revBrokers)
.option("subscribe", topic)
...
as[(String, String)]
и я хочу посчитать количество строк, когда первое поле не пустое Строка в то же время записать это в hdfs.
псевдокод:
sourceDF.filter(_.1 != "").count.writeStream.option("checkpointPath", <cp-1>).format("console").start()
sourceDF.writeStream.option("checkpointPath", <cp-2>).start(<descDir>)
но теперь эти 2 потока не синхронизированы, потому что они работают независимо.
однако, я действительно хочу подсчитать непустые строки и распечатать их одновременно, сохранив поток в формате hdf так,Я знаю, сколько строк я сохранил с непустым первым полем.
У кого-нибудь есть идеи, как этого добиться?Я знаю, что потоковая передача искры не предназначена для этого (обработка побочных эффектов), но иногда многократные действия были бы очень полезны (потоковая передача искры может сделать это, так как она использует микропартии).