У меня есть много CSV spark.readStream в разных местах, я должен проверить все их с помощью scala, я указал запрос для каждого потока, но когда я запускаю задание, я получил это сообщение
java.lang.IllegalArgumentException: не удается запустить запрос с именем «query1», так как запрос с таким именем уже активен
Я решил свою проблему, создав множество потоковых запросов, таких как:
val spark = SparkSession
.builder
.appName("test")
.config("spark.local", "local[*]")
.getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
val event1 = spark
.readStream //
.schema(schema_a)
.option("header", "true")
.option("sep", ",")
.csv(path_a)
val query = event1.writeStream
.outputMode("append")
.format("console")
.start()
spark.streams.awaitAnyTermination()