Spark - уменьшение входного файла по идентификатору пользователя - PullRequest
0 голосов
/ 17 декабря 2018

Я работаю со структурированным входным файлом, который содержит userId, seqId, eventType и страну.Мне нужно уменьшить его с помощью userId, принимая последнее непустое значение каждого поля после упорядочивания по seqId.Для данного ввода:

userId    seqId eventType country
A1600001    2   Update  JP
A1600001    3   Update  
B2301001    2   Update  CH
A1600001    1   Create  CH
C1200011    2   Update  
C1200011    1   Create  IN

Сокращенный результат должен быть:

A1600001    3   Update  JP
C1200011    2   Update  IN
B2301001    2   Update  CH

Я начал со следующего:

scala> val file = sc.textFile("/tmp/sample-events.tsv")
scala> val lines = file.map( x => (x.split("\t")(0), x) )
scala> lines.foreach(x => println(x))
(A1600001,A1600001  2   Update  JP)
(A1600001,A1600001  3   Update  )
(B2301001,B2301001  2   Update  CH)
(A1600001,A1600001  1   Create  CH)
(C1200011,C1200011  2   Update  )
(C1200011,C1200011  1   Create  IN)

Теперь я хочу reduceByKey линии (я полагаю?), но я довольно плохо знаком с предметом и не знаю, как построить функцию сокращения.Может кто-нибудь помочь?

Ответы [ 3 ]

0 голосов
/ 17 декабря 2018

Самое простое решение, которое я могу придумать с помощью ReduceByKey, здесь.

//0: userId    1: seqId  2: eventType 3: country
val inputRdd = spark.sparkContext.textFile("data/input.txt")
  .map(_.split("\\s+", 4))

//Here reduce by userId and taking the record which is having max(seqId)
// order by seqId so that if the max value missing country, can be merged that value from the immediate seqId
inputRdd
  .map(ls => (ls(0), ls))
  .sortBy(_._2(1).toInt)
  .reduceByKey {
    (acc, y) =>
      if (acc(1).toInt < y(1).toInt)
        if (y.length == 3) y :+ acc(3) else y
      else
        acc
  }.map(_._2.mkString("\t"))
  .foreach(println)

data / input.txt

A1600001    2   Update  JP
A1600001    3   Update
B2301001    2   Update  CH
A1600001    1   Create  CH
C1200011    2   Update
C1200011    1   Create  IN

Вывод:

B2301001    2   Update  CH
C1200011    2   Update  IN
A1600001    3   Update  JP
0 голосов
/ 19 декабря 2018

Использование функций spark-sql и window.

scala> val df = Seq(("A1600001",2,"Update","JP"),("A1600001",3,"Update",""),("B2301001",2,"Update","CH"),("A1600001",1,"Create","CH"),("C1200011",2,"Update",""),("C1200011",1,"Create","IN")).toDF("userId","seqId","eventType","country")
df: org.apache.spark.sql.DataFrame = [userId: string, seqId: int ... 2 more fields]

scala> df.createOrReplaceTempView("samsu")

scala> spark.sql(""" with tb1(select userId, seqId, eventType, country, lag(country) over(partition by userid order by seqid) lg1, row_number() over(partition by userid order by seqid) rw1,co
unt(*) over(partition by userid) cw1 from samsu) select userId, seqId, eventType,case when country="" then lg1 else country end country from tb1 where rw1=cw1 """).show(false)
+--------+-----+---------+-------+                                              
|userId  |seqId|eventType|country|
+--------+-----+---------+-------+
|A1600001|3    |Update   |JP     |
|C1200011|2    |Update   |IN     |
|B2301001|2    |Update   |CH     |
+--------+-----+---------+-------+


scala>
0 голосов
/ 17 декабря 2018

Один из возможных способов (при условии, что seqId никогда не бывает пустым):

  1. подготовить pair_rdd1, отфильтровав сначала все пустые значения eventType с помощью mapper, а затем применить reduceByKey к ключу= userId чтобы найти последний непустой eventType за userId.Предполагая, что функция редуктора принимает две пары [seqId, eventType] и возвращает пару [seqId, eventType], функция сокращения должна выглядеть следующим образом: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
  2. подготовить pair_rdd2, отфильтровав сначала все пустые значения country с помощью mapper, а затемприменить reduceByKey к ключу = userId, чтобы найти последние непустые country за userId.Предполагая, что функция редуктора принимает две пары [seqId, country] и возвращает пару [seqId, country], функция сокращения должна выглядеть следующим образом: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
  3. , поскольку нам также нужны самые последние seqId на userId, мы готовим pair_rdd3, применив reduceByKey к ключу = userId и функции редуктора: (seqId1 seqId2) => max(seqId1, seqId2)
  4. , теперь мы выполняем pair_rdd3.leftOuterJoin(pair_rdd1), чтобы получить [userId, seqId, eventType], затем по результату левого соединения мы выполняем .leftOuterJoin(pair_rdd2)чтобы наконец получить [userId, seqId, eventType, country] (оба соединения на ключе = userId)

Обратите внимание, что здесь мы используем left join вместо inner join, так как могут быть идентификаторы пользователя со всеми типами eventTypes иливсе страны пустые

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...