У меня есть 2 потоковых запроса, как показано ниже
val streamingQuery = injectableDependencies.writeStreamDfToDeltaFile(validDF, dataPath, checkPointPath, configuration.pollingTimerSeconds, queryName, configuration.outputMode, configuration.partitionBy)
//streamingQuery for Invalid tables
val streamingQueryInvalid = injectableDependencies.writeStreamDfToDeltaFile(inValidDF, dataPathInvalid, checkPointPath, configuration.pollingTimerSeconds, queryNameInvalid, configuration.outputMode, configuration.partitionBy)
//creating valid tables
injectableDependencies.createTableOverDeltaFile(configuration.tableName, configuration.databaseName, dataPath, streamingQuery)
//creating invalid tables
injectableDependencies.createTableOverDeltaFile("Invalid"+configuration.tableName, configuration.databaseName, dataPath, streamingQueryInvalid)
ниже, я пытаюсь записать потоки
df
.writeStream
.format("DELTA")
.option("path", dataPath)
.option("checkpointLocation", checkPointPath)
.partitionBy(partitionBy.getOrElse(List[String]()): _*)
.outputMode(outputMode)
.trigger(Trigger.ProcessingTime(pollingTimerSeconds.seconds))
.queryName(queryName)
.start()
я получаю ошибку ниже
ERROR Uncaught throwable from user code: java.lang.IllegalStateException: Cannot start query with id 74e6b948-bc55-419c-af42-34ef7ea015ba as another query with same id is already active. Perhaps you are attempting to restart a query from the checkpoint that is already active.
Можеткто-нибудь, пожалуйста, предложите, что идет не так