Функции
Except
и ExceptAll
для apache набора данных spark дают пустой фрейм данных во время потоковой передачи.
Я использую два набора данных, оба являются пакетными, левый имеет несколько строк, которые не являются присутствует в правом-одном. После запуска он дает правильный вывод, то есть строки, которые находятся в левой, но не в правой.
Теперь я повторяю то же самое в потоковой передаче, левая потоковая, в то время как правая пакетная источник, в этом сценарии я получаю пустой фрейм данных, на котором dataframe.writeStream
выдает ошибку None.get исключения.
package exceptPOC
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
object exceptPOC {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("pocExcept")
.setMaster("local")
.set("spark.driver.host", "localhost")
val sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
val schema = new StructType()
.add("sepal_length", DoubleType, true)
.add("sepal_width", DoubleType, true)
.add("petal_length", DoubleType, true)
.add("petal_width", DoubleType, true)
.add("species", StringType, true)
var df1 = sparkSession.readStream
.option("header", "true")
.option("inferSchema", "true")
.option("treatEmptyValuesAsNulls", "false")
.option("delimiter", ",")
.schema(schema)
.csv(
"some/path/exceptPOC/streamIris" //contains iris1
)
var df2 = sparkSession.read
.option("header", "true")
.option("inferSchema", "true")
.option("treatEmptyValuesAsNulls", "false")
.option("delimiter", ",")
.schema(schema)
.csv(
"/some/path/exceptPOC/iris2.csv"
)
val exceptDF = df1.except(df2)
exceptDF.writeStream
.format("console")
.start()
.awaitTermination()
}
}