Я хочу записать три отдельных вывода в один вычисляемый набор данных, для этого мне нужно кэшировать / сохранить мой первый набор данных, иначе он будет собирать первый набор данных три раза, что увеличит мое время расчета.
Например
FirstDataset // Get data from kafka;
SecondDataset = FirstDataSet.mapPartitions(Some Calculations);
ThirdDataset = SecondDataset.mapPartitions(Some Calculations);
Теперь я хочу отфильтровать свой ThirdDataset и вывести отфильтрованные наборы данных для трех различных условий с разными логами c.
ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
Теперь для каждого Writestream вычисляется ThirdDataset , Если я кеширую ThirdDataset, то он не будет рассчитываться трижды.
Но когда я делаю ThirdDataset.cache()
, он выдает мне следующую ошибку,
Исключение в потоке "main" org. apache .spark. sql .AnalysisException: Запросы с потоковыми источниками должны выполняться с writeStream.start () ;;
Может кто-нибудь предложить мне.