Сравнение и подсчет значений в таблице - Spark с использованием Scala - PullRequest
0 голосов
/ 20 сентября 2019

Я сейчас практикуюсь на Spark, используя язык Scala.Я использую файл данных с несколькими заголовками, но мне интересно сравнить значения только двух столбцов.Есть несколько тысяч строк с повторяющимися клиентами.Это '[customerID, busID, date_travel, Firstname, Lastname]'

Меня интересует сравнение обоих customerID и busID всех клиентов в таблице и создание столбца, в котором указывается количество экземпляров дваклиенты были в одном автобусе.Каждая поездка на автобусе имеет уникальный идентификатор.

Я успешно загрузил файл данных в spark, создал фрейм данных и создал новое значение: Пример фрейма данных:

val exampleInputDF = Seq(("20185","344","01/06/2019","John","Smith"),("20186","344","01/06/2019","Jack","Sparrow"),("20187","344","01/06/2019","John","Wick")).toDF("customerID", "busID","date_travel","Firstname","Lastname")

+----------+-----+-----------+---------+--------+
|customerID|busID|date_travel|Firstname|Lastname|
+----------+-----+-----------+---------+--------+
|     20185|  344| 01/06/2019|     John|   Smith|
|     20186|  344| 01/06/2019|     Jack| Sparrow|
|     20187|  344| 01/06/2019|     John|    Wick|
+----------+-----+-----------+---------+--------+
val commonjourneys = spark.sql("SELECT customerID,busID, from data_table ORDER BY busID")

Затем я подумываю использовать функции Window для созданияновый столбец, но я не понимаю, как я смогу сравнить каждое из значений.Раньше у меня было бы .filter($"customerID" = "BUS ID") для постоянного значения, но теперь значения постоянно меняются.Я думаю, что мог бы использовать функцию IF, но не уверен, где начать реализовывать это.Кроме того, чтобы ограничить вывод, я только ищу случаи, когда есть> 2 поездки на автобусе вместе.

Мой вывод в идеале должен иметь 3 столбца - [customerID - Passenger1, customerID - Passenger 2, number_of_journeys_together]

Кто-нибудь знает, какподходите к решению подобных проблем и к любой функциональности, имеющейся в Scala / Spark, которая может помочь выполнить это самым простым способом.

1 Ответ

1 голос
/ 20 сентября 2019

Вот решение:

input:

import sparkSession.implicits._

    val ex = Seq(("20185","344","01/06/2019","John","Smith"),("20186","344","01/06/2019","Jack","Sparrow"),("20187","344","01/06/2019","John","Wick")).toDF("customerID", "busID","date_travel","Firstname","Lastname").select("customerID","busID","date_travel")

+----------+-----+-----------+
|customerID|busID|date_travel|
+----------+-----+-----------+
|     20185|  344| 01/06/2019|
|     20186|  344| 01/06/2019|
|     20187|  344| 01/06/2019|
+----------+-----+-----------+

Затем я создаю список клиентов для каждой поездки, Затем я фильтрую этот список, чтобы удалить идентификатор клиента, связанный с UDF.Затем я взрываю этот список, чтобы иметь два столбца с идентификатором клиента и customerID2

val w = Window.partitionBy("busID","date_travel")

val d = ex.withColumn("listOfCustomerIDForEachBusForEachDate", collect_set("customerID").over(w))
    .withColumn("listOfCustomerWithoutCustomerID", filterCustomerListUDF(col("listOfCustomerIDForEachBusForEachDate"), col("customerID")))
    .drop("listOfCustomerIDForEachBusForEachDate")
    .withColumn("customerID2", explode(col("listOfCustomerWithoutCustomerID")))

d.show

+----------+-----+-----------+-------------------------------+-----------+
|customerID|busID|date_travel|listOfCustomerWithoutCustomerID|customerID2|
+----------+-----+-----------+-------------------------------+-----------+
|     20185|  344| 01/06/2019|                 [20187, 20186]|      20187|
|     20185|  344| 01/06/2019|                 [20187, 20186]|      20186|
|     20186|  344| 01/06/2019|                 [20187, 20185]|      20187|
|     20186|  344| 01/06/2019|                 [20187, 20185]|      20185|
|     20187|  344| 01/06/2019|                 [20185, 20186]|      20185|
|     20187|  344| 01/06/2019|                 [20185, 20186]|      20186|
+----------+-----+-----------+-------------------------------+-----------+


  def filterCustomerList(customerList : scala.collection.mutable.WrappedArray[String], customerID : String) : scala.collection.mutable.WrappedArray[String] = {
    val outputList = customerList.filter(_ != customerID)
    outputList
  }

  val filterCustomerListUDF = udf(filterCustomerList _)

Затем вы можете сделать groupBy, чтобы получить number_of_journeys_together

d.groupBy("customerID","customerID2").count()
  .withColumnRenamed("count","number_of_journeys_together").show

+----------+-----------+---------------------------+
|customerID|customerID2|number_of_journeys_together|
+----------+-----------+---------------------------+
|     20187|      20185|                          1|
|     20185|      20187|                          1|
|     20185|      20186|                          1|
|     20186|      20185|                          1|
|     20186|      20187|                          1|
|     20187|      20186|                          1|
+----------+-----------+---------------------------+

В этом примере результат всегда1, но это нормально с этим входным фреймом данных

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...