Как мы кешируем / сохраняем набор данных в потоковой передаче с искрой 2.4.4 - PullRequest
0 голосов
/ 17 января 2020

Я хочу записать три отдельных вывода в один вычисляемый набор данных, для этого мне нужно кэшировать / сохранить мой первый набор данных, иначе он будет собирать первый набор данных три раза, что увеличит мое время расчета.

Например

FirstDataset // Get data from kafka;

SecondDataset = FirstDataSet.mapPartitions(Some Calculations);

ThirdDataset = SecondDataset.mapPartitions(Some Calculations);

Теперь я хочу отфильтровать свой ThirdDataset и вывести отфильтрованные наборы данных для трех различных условий с разными логами c.

ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

Теперь для каждого Writestream вычисляется ThirdDataset , Если я кеширую ThirdDataset, то он не будет рассчитываться трижды.

Но когда я делаю ThirdDataset.cache(), он выдает мне следующую ошибку,

Исключение в потоке "main" org. apache .spark. sql .AnalysisException: Запросы с потоковыми источниками должны выполняться с writeStream.start () ;;

Может кто-нибудь предложить мне.

1 Ответ

0 голосов
/ 17 января 2020

Кэш не имеет смысла с набором потоковых данных.

SPARK-20865

Возможно, вам придется изменить подход.

что-то вроде

ThirdDataset.writeStream().foreach(**SOMECALCULATIONS BASED ON CONDITION**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

сбой кэша в потоковых наборах данных

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...