Стандартизация потоковой передачи с использованием структурированной потоковой передачи Spark - PullRequest
0 голосов
/ 27 сентября 2018

Относительно просто использовать Spark Structured Streaming API для выполнения groupBys и агрегации потоковых данных.

Например, у меня есть кадр потоковых данных, df данных телеметрии IOT.Я группирую его по systemId и systemState и выполняю агрегирование, чтобы ответить на вопросы типа "Каково среднее значение и отклонение измерения x для системы y в состоянии z?"Этот ответ снова приходит в виде фрейма потоковых данных - назовите его usualDF.

. Я хотел бы рассмотреть следующее: «Я вижу, что система y находится в состоянии z и что измерение x имеет значение v. Это высокий или низкий ? "

Чтобы ответить на этот вопрос, я хотел бы использовать usualDF до стандартизировать df.Подобное желание было высказано и признано «невозможным» в этом посте .Уже самостоятельно реализовав потоковую нормализацию в Python с использованием Pandas, я знаю, что возможно - в Spark для него пока нет готовой функции.

Хорошим первым шагом будет объединение двух фреймов данных.Более конкретно, нам нужно взять левое внешнее объединение df и usualDF вдоль столбцов systemId и systemState.API структурированной потоковой передачи поддерживает левые внешние объединения потоковых фреймов данных, но требует использования водяных знаков.Я получаю следующее сообщение об ошибке:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

Изменение выходных режимов:* нет, и я не вижу четкого способа снабдить его одним.

Есть мысли или предложения?

1 Ответ

0 голосов
/ 01 октября 2018

В структурированном руководстве по потоковой передаче говорится:

Начиная с Spark 2.3, вы не можете использовать другие операции, не относящиеся к карте, перед объединениями.Вот> несколько примеров того, что нельзя использовать.

Невозможно использовать потоковые агрегаты перед объединениями.

Невозможно использовать mapGroupsWithState и flatMapGroupsWithState в режиме обновления перед объединениями. "

Так что мой "хороший первый шаг" - это то, что не поддерживается. Я попытаюсь использовать mapGroupWithState для отслеживания средних значений и стандартного отклонения в качестве обновления здесь с кодом, если он работает.

...