Регистрация промежуточных результатов в структурированном потоковом запросе - PullRequest
0 голосов
/ 21 января 2019

Я пишу запрос структурированной потоковой передачи, который включает в себя фильтр, а затем устанавливаю водяной знак.Как я могу записать отфильтрованные строки?

Это кажется невозможным, поскольку структурированная потоковая передача позволяет оценивать только один запрос.Очевидно, что .collect и регистрация строк не работают, потому что потоковый набор данных оценивается только с .writeStream, и в запросе может быть только один такой вызов:

def process: DataFrame = {
    // df is a streaming dataset
    val toDrop = df.filter(condition)
    val toKeep = df.filter(!condition)
    // doesn't work on streaming datasets
    log(s"Lines dropped: ${toDrop.collect.mkString(", ")}")
    toKeep.withWatermark(...)
}

Решение можетчтобы переместить фильтр в foreachBatch, но в моем случае мне нужно отфильтровать перед установкой водяного знака, так что это не вариант.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...