Я не могу приспособить модель FP-Growth к искре - PullRequest
0 голосов
/ 20 февраля 2019

Пожалуйста, вы можете мне помочь?У меня есть набор данных из 80 файлов CSV и кластер из одного мастера и 4 подчиненных.Я хочу прочитать файлы CSV в кадре данных и распараллелить его на четырех ведомых устройствах.После этого я хочу отфильтровать фрейм данных с помощью группы.В моих искровых запросах результат содержит столбцы «code_ccam» и «dossier», сгруппированные по («code_ccam», «dossier»).Я хочу использовать алгоритм FP-Growth для обнаружения последовательностей «code_ccam», которые повторяются «папкой».Но когда я использую команду FPGrowth.fit (), у меня появляется следующая ошибка:

"error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.sql.Dataset[_]"

Вот мои команды зажигания:

val df = spark.read.option("header", "true").csv("file:///home/ia/Projet-Spark-ace/Donnees/Fichiers CSV/*.csv")
import org.apache.spark.sql.functions.{concat, lit}
val df2 = df.withColumn("dossier", concat(col("num_immatriculation"), lit(""), col("date_acte"), lit(""), col("rang_naissance"), lit(""), col("date_naissance")))
val df3 = df2.drop("num_immatriculation").drop("date_acte").drop("rang_naissance").drop("date_naissance")
val df4 = df3.select("dossier","code_ccam").groupBy("dossier","code_ccam").count()
val transactions = df4.agg(collect_list("code_ccam").alias("codes_ccam")).rdd.map(x => x)
import org.apache.spark.ml.fpm.FPGrowth
val fpgrowth = new FPGrowth().setItemsCol("code_ccam").setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(transactions)

1 Ответ

0 голосов
/ 20 февраля 2019

Спасибо тебе большое.Это сработало.Я заменил collect_list на collect_set.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...