Я пишу запрос структурированной потоковой передачи, который включает в себя фильтр, а затем устанавливаю водяной знак.Как я могу записать отфильтрованные строки?
Это кажется невозможным, поскольку структурированная потоковая передача позволяет оценивать только один запрос.Очевидно, что .collect
и регистрация строк не работают, потому что потоковый набор данных оценивается только с .writeStream
, и в запросе может быть только один такой вызов:
def process: DataFrame = {
// df is a streaming dataset
val toDrop = df.filter(condition)
val toKeep = df.filter(!condition)
// doesn't work on streaming datasets
log(s"Lines dropped: ${toDrop.collect.mkString(", ")}")
toKeep.withWatermark(...)
}
Решение можетчтобы переместить фильтр в foreachBatch, но в моем случае мне нужно отфильтровать перед установкой водяного знака, так что это не вариант.