Взрыв Кассандры UDT с плоской картой в Spark 2.x (Scala) - PullRequest
0 голосов
/ 25 апреля 2018

У меня есть данные в Кассандре (3.11.2), которые также являются моими df:

Данные в Кассандре:

id | some_data  
-- | ---------  
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]

подробности df:

 df.printSchema() 
    //|  |-- id: integer (nullable = true)
    //|  |-- some_data: array (nullable = true)
    //|  |    |-- element: struct (containsNull = true)
    //|  |    |    |-- s1: string (nullable = true)
    //|  |    |    |-- s2: string (nullable = true)

Здесь схема Кассандры определяется как:

id: String
some_data: список замороженных test_udt, созданный как -> CREATE TYPE test.test_udt ( s1 текст, текст s2 );

Я использую Spark-Cassandra-Connector 2.0 для извлечения данных из Cassandra для обработки на Spark 2.2.1.

Требуемый вывод

Вывод в разобранном виде df

id | some_data                                          | s1    | s2  
-- | ---------------------------------------------------| ----- | ---- 
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14 
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34

Мой подход в прошлом

Я использовал спарк-кассандра-разъем 1.6 и Spark 1.6, и у меня было аккуратное решение вышеуказанной проблемы:

import org.apache.spark.sql.functions._    
case class my_data(s1 : String, s2 : String)

val flatData = df.explode(df("some_data")){
            case Row(x : Seq[Row]) =>
                x.map(x =>
                    my_data(
                        x.apply(0).asInstanceOf[String], 
                        x.apply(1).asInstanceOf[String]
                    ))
                  }
flatData.show()

После обновления до 2.x я получаю сообщение об ошибке при использовании функции explode. В искровом документе говорится, что explode устарело. flatMap предлагается в качестве альтернативы explode.

Вопросы:

  1. Как мне разобрать Dataframe в Scala, чтобы получить те же результаты, что и раньше?
  2. Как мне перевести мой старый код, используя flatmap?

1 Ответ

0 голосов
/ 25 апреля 2018

Вы можете использовать функцию explode , которая также предлагается в качестве альтернативы методу explode.getItem используется для получения поля из struct по его имени.

df.withColumn("exploded" , explode($"some_data"))
  .withColumn("s1" , $"exploded".getItem("s1"))
  .withColumn("s2" , $"exploded".getItem("s2"))
  .drop("exploded")
  .show(false)

//+---+------------------------------+-----+-----+
//|id |some_data                     |s1   |s2   |
//+---+------------------------------+-----+-----+
//|1  |[[str11,str12], [str13,str14]]|str11|str12|
//|1  |[[str11,str12], [str13,str14]]|str13|str14|
//|2  |[[str21,str22], [str23,str24]]|str21|str22|
//|2  |[[str21,str22], [str23,str24]]|str23|str24|
//|3  |[[str31,str32], [str33,str44]]|str31|str32|
//|3  |[[str31,str32], [str33,str44]]|str33|str44|
//+---+------------------------------+-----+-----+
...