Spark-RDD манипулирующие данные - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть пример данных, как показано ниже:

UserId,ProductId,Category,Action
1,111,Electronics,Browse
2,112,Fashion,Click
3,113,Kids,AddtoCart
4,114,Food,Purchase
5,115,Books,Logout
6,114,Food,Click
7,113,Kids,AddtoCart
8,115,Books,Purchase
9,111,Electronics,Click
10,112,Fashion,Purchase
3,112,Fashion,Click

Мне нужно создать список пользователей, которые заинтересованы в категории «Мода» или «Электроника», но не в обеих категориях. Пользователь интересуется, выполнил ли он / она какое-либо из этих действий (Click / AddToCart / Buy), используя код spark / scala, который я делал до этого:

val rrd1 = sc.textFile("/user/harshit.kacker/datametica_logs.csv")
val rrd2 = rrd1.map( x=> {
     | val c = x.split(",")
     | (c(0).toInt , x)})

val rrd3 = rrd1.filter(x=> x.split(",")(2) == "Fashion" || x.split(",")(2) == "Electronics")
val rrd4 = rrd3.filter(x=> x.split(",")(3)== "Click" || x.split(",")(3)=="Purchase" || x.split(",")(3)=="AddtoCart")
rrd4.collect.foreach(println)

2,112,Fashion,Click
9,111,Electronics,Click
10,112,Fashion,Purchase
3,112,Fashion,Click
4,111,Electronics,Click
19,112,Fashion,Click
9,112,Fashion,Purchase
2,112,Fashion,Click
2,111,Electronics,Click
1,112,Fashion,Purchase

теперь мне нужно поработать над ", чтобы сформировать список пользователей, которые заинтересованы в категории" Мода "или в категории" Электроника ", но не в обеих категориях " этой курсивной частью и получить желаемый результат как:

10,Fashion
3,Fashion
4,Electronics
19,Fashion
1,Fashion

означает, что пользователь должен иметь Мода и электроника в качестве категории, которую следует исключить, не могли бы вы помочь достичь того же?

1 Ответ

0 голосов
/ 06 сентября 2018

Начните с анализа входного текстового файла в кортежах:

val srcPath = "/user/harshit.kacker/datametica_logs.csv"

// parse test file in to tuples:
val rdd = spark.sparkContext.textFile(srcPath)
val rows = rdd.map(line => line.split(",")).map(row => (row(0), row(1), row(2), row(3)))
val header = rows.first
// drop the header:
val logs = rows.filter(row => row != header)

Фильтр СДР по критериям интереса:

val interests = logs.filter(log =>
  List("Click", "AddtoCart", "Purchase").contains(log._4)
)

Фильтр для моды и электроники отдельно:

val fashion = interests.filter(row => row._3 == "Fashion")
val electronics = interests.filter(row => row._3 == "Electronics")

Найдите общие идентификаторы пользователя между модой и электроникой:

val fashionIds = fashion.map(_._1).distinct
val electronicsIds = electronics.map(_._1).distinct
val commonIds = fashionIds.intersection(electronicsIds).collect()

Объедините строки моды и электроники и отфильтруйте идентификаторы, общие для обоих:

val finalRdd = (fashion ++ electronics)
  .filter(log => !commonIds.contains(log._1))
  .map(log => (log._1, log._3))
  .distinct()

Редактировать: Использование DataFrame

// using dataframes:

val df = spark.read.option("header", "true").csv(srcPath)
val interestDf = df.where($"Action".isin("Click", "Purchase", "AddToCart"))
val fashionDf = interestDf.where($"Category" === "Fashion")
val electronicsDf = interestDf.where($"Category" === "Electronics")

val joinDf = electronicsDf.alias("e").join(fashionDf.alias("f"), Seq("UserId"), "outer")
  .where($"e.Category".isNull || $"f.Category".isNull)

val finalDf = joinDf.select($"UserId", when($"e.Category".isNull, $"f.Category").otherwise($"e.Category").as("Category")).distinct
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...