Выберите конкретные строки в кадре данных Spark для каждой группы - PullRequest
0 голосов
/ 29 января 2019

У меня есть такой фрейм данных:

+-----------------+------------------+-----------+--------+---+
| conversation_id |   message_body   | timestamp | sender |   |
+-----------------+------------------+-----------+--------+---+
| A               | hi               | 9:00      | John   |   |
| A               | how are you?     | 10:00     | John   |   |
| A               | can we meet?     | 10:05     | John   | * |
| A               | not bad          | 10:30     | Steven | * |
| A               | great            | 10:40     | John   |   |
| A               | yeah, let's meet | 10:35     | Steven |   |
| B               | Hi               | 12:00     | Anna   | * |
| B               | Hello            | 12:05     | Ken    | * |
+-----------------+------------------+-----------+--------+---+

Для каждого разговора я хотел бы иметь последнее сообщение в первом блоке первого отправителя и первое сообщение второго отправителя.Я пометил их звездочкой.

Одна идея, которая у меня возникла, заключается в том, чтобы назначить 0 для первого пользователя и 1 для второго пользователя.

В идеале я хотел бы иметь что-то подобное:

+-----------------+---------+------------+--------------+---------+------------+----------+
| conversation_id | sender1 | timestamp1 |   message1   | sender2 | timestamp2 | message2 |
+-----------------+---------+------------+--------------+---------+------------+----------+
| A               | John    | 10:05      | can we meet? | Steven  | 10:30      | not bad  |
| B               | Anna    | 12:00      | Hi           | Ken     | 12:05      | Hello    |
+-----------------+---------+------------+--------------+---------+------------+----------+

Как я мог сделать это в Spark?

1 Ответ

0 голосов
/ 30 января 2019

Возникли интересные проблемы.

  • Изменено с 10:35 до 10: 45
  • Использовался формат 0, например 09:00 вместо 9: 00
  • Вывам нужно будет соответственно использовать ваши собственные типы данных, это просто демонстрирует необходимый подход
  • Готово в DataBricks Notebook

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._ 
    import spark.implicits._
    
    val df = 
         Seq(("A", "hi", "09:00", "John"), ("A", "how are you?", "10:00", "John"), 
             ("A", "can we meet?", "10:05", "John"), ("A", "not bad", "10:30", "Steven"), 
             ("A", "great", "10:40", "John"), ("A", "yeah, let's meet", "10:45", "Steven"),
             ("B", "Hi", "12:00", "Anna"), ("B", "Hello", "12:05", "Ken")
            ).toDF("conversation_id", "message_body", "timestampX", "sender")
    
    // Get rank, 1 is who were initiates conversation, the other values can be used to infer relationships
    // Note no @Transient required here with Window 
    val df2 = df.withColumn("rankC", row_number().over(Window.partitionBy($"conversation_id").orderBy($"timestampX".asc)))
    
    // A value <> 1 is the first message of second Sender.
    // The 1 value of this is the last message of first "block"
    val df3 = df2.select('conversation_id as "df3_conversation_id", 'sender as "df3_sender", 'rankC as "df3_rank")
    // To avoid pipelined renaming issues that occur
    val df3a = df3.groupBy("df3_conversation_id", "df3_sender").agg(min("df3_rank") as "rankC2").filter("rankC2 != 1")
    
    //JOIN the values with some smarts. Some odd errors in Spark thru pipe-lining gotten. Need to drop pipelined row(), ranking etc.
    val df4 = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC") + 1)).drop("rankC").drop("rankC2")   
    val df4a = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC"))).drop("rankC").drop("rankC2")  
    
    // The get other missing data, could have all been combined but done in steps for simplicity. Just a simple final JOIN and you ahve the answer.
    val df5 = df4.join(df4a, (df4("df3_conversation_id") === df4a("df3_conversation_id")))  
    df5.show(false)
    

возвращает:

Вывод не будет полностью форматировать здесь, запустите его в REPL, чтобы увидеть заголовки.

 |B                  |Ken       |B              |Hi          |12:00     |Anna  |B                  |Ken       |B              |Hello       |12:05     |Ken   |
 |A                  |Steven    |A              |can we meet?|10:05     |John  |A                  |Steven    |A              |not bad     |10:30     |Steven|

Вы можете дополнительно манипулировать данными, тяжелый подъем уже сделан.У Catalyst Optimizer есть некоторые проблемы с компиляцией и т. Д., Поэтому я так обошелся.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...