Искровой фильтр на основе максимальной записи даты - PullRequest
3 голосов
/ 09 мая 2019

Я использую Spark / Scala для обработки таблицы Hive, которая содержит данные транзакций для каждого участника.Мне нужно получить максимальную запись для каждого участника.Я выполнил эту задачу, используя приведенный ниже код, и он успешно работает, но производительность не достигается.

Мне нужно спросить, есть ли другой способ повысить производительность этого кода?Я нашел несколько способов сделать это, используя spark-sql, но я предпочитаю Spark Dataframe или Dataset.

В приведенном ниже примере будут воспроизводиться мой код и мои данные.

  val mamberData = Seq(
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-09-09 00:00:00")),
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-03-02 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2019-01-01 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2018-01-01 00:00:00")),
    Row("7088", "SF", java.sql.Timestamp.valueOf("2018-09-01 00:00:00"))
  )



  val MemberDataSchema = List(
    StructField("member_id", StringType, nullable = true),
    StructField("member_state", StringType, nullable = true),
    StructField("activation_date", TimestampType, nullable = true)
  )

  import spark.implicits._

  val memberDF =spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  )

  val memberDfMaxDate = memberDF.groupBy('member_id).agg(max('activation_date).as("activation_date"))

  val memberDFMaxOnly = memberDF.join(memberDfMaxDate,Seq("member_id","activation_date"))

Выводниже

+---------+------------+-------------------+
|member_id|member_state|activation_date    |
+---------+------------+-------------------+
|1234     |CX          |2018-09-09 00:00:00|
|1234     |CX          |2018-03-02 00:00:00|
|5678     |NY          |2019-01-01 00:00:00|
|5678     |NY          |2018-01-01 00:00:00|
|7088     |SF          |2018-09-01 00:00:00|
+---------+------------+-------------------+

+---------+-------------------+------------+
|member_id|    activation_date|member_state|
+---------+-------------------+------------+
|     7088|2018-09-01 00:00:00|          SF|
|     1234|2018-09-09 00:00:00|          CX|
|     5678|2019-01-01 00:00:00|          NY|
+---------+-------------------+------------+

Ответы [ 3 ]

1 голос
/ 09 мая 2019

DataFrame groupBy настолько эффективен, насколько это возможно (более эффективно, чем Window-функции из-за частичной агрегации).

Но вы можете избежать объединения, используя struct в предложении агрегации:

val memberDfMaxOnly = memberDF.groupBy('member_id).agg(max(struct('activation_date, 'member_state)).as("row_selection"))
  .select(
    $"member_id",
    $"row_selection.activation_date",
    $"row_selection.member_state"
  )
1 голос
/ 09 мая 2019

Вы можете использовать множество методов, например, Ranking или Dataset. Я предпочитаю использовать reduceGroups, так как это функциональный стиль и его легко интерпретировать.

  case class MemberDetails(member_id: String, member_state: String, activation_date: FileStreamSource.Timestamp)

  val dataDS: Dataset[MemberDetails] = spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  ).as[MemberDetails]
    .groupByKey(_.member_id)
    .reduceGroups((r1, r2) ⇒ if (r1.activation_date > r2.activation_date) r1 else r2)
    .map { case (key, row) ⇒ row }


  dataDS.show(truncate = false)
0 голосов
/ 09 мая 2019

Используйте оконные функции , чтобы назначить ранг и отфильтровать первое в каждой группе.

import org.apache.spark.sql.expressions.Window

// Partition by member_id order by activation_date
val byMemberId = Window.partitionBy($"member_id").orderBy($"activation_date" desc)

// Get the new DF applying window function
val memberDFMaxOnly = memberDF.select('*, rank().over(byMemberId) as 'rank).where($"rank" === 1).drop("rank")

// View the results
memberDFMaxOnly.show()
+---------+------------+-------------------+
|member_id|member_state|    activation_date|
+---------+------------+-------------------+
|     1234|          CX|2018-09-09 00:00:00|
|     5678|          NY|2019-01-01 00:00:00|
|     7088|          SF|2018-09-01 00:00:00|
+---------+------------+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...