Spark SQL запрос topN с потоком управления - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть фрейм данных, я хотел сделать запрос, чтобы получить данные, если они соответствуют требованиям правила, иначе изменить порядок данных и получить первый.Но я не знаю, как это сделать.
DataFrame похож на это, newtable

+--------------------------+--------------+-------+-------+-------------------------+
|_id                       |relatedID     |related|u      |pro                      |
+--------------------------+--------------+-------+-------+-------------------------+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[name,100,yyj196,0.8]    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[age,102,21,0.9]         |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[favorite,102,IT,0.7]    |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[name,100,yyj196,0.8]    |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[age,102,21,0.9]         |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[favorite,102,IT,0.7]    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|
+--------------------------+--------------+-------+-------+-------------------------+

это было соединение из двух других данных

вот схема

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- relatedID: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- related: integer (nullable = true)
 |-- u: integer (nullable = true)
 |-- pro: struct (nullable = true)
 |    |-- fieldID: string (nullable = true)
 |    |-- sourceID: string (nullable = true)
 |    |-- value: string (nullable = true)
 |    |-- weight: double (nullable = true)

вот код в Scala

//join  two  dataframe  & create tempview newtable
dfsU.join(dfsu,dfsU("related") === (dfsu("u")),"inner")
     .createTempView("newtable")

    //test  ,The data displayed above 
    val checkdata =  spark.sql("select * from newtable  where  related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ")
    checkdata.show(false)
    checkdata.printSchema()
    // group  && set  ranks 
    spark.sql("select * ,Row_Number() OVER (partition by  _id , pro.fieldID  ORDER BY pro.weight desc) ranks FROM newtable")
      .createTempView("tmpview")
    //test  , get the  data  from temview 
    spark.sql("select * from tmpview  where  related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").show(false)

вот результат. Это выглядит очень странно.по pro.weight

+--------------------------+--------------+-------+-------+-------------------------+-----+
|_id                       |relatedID     |related|u      |pro                      |ranks|
+--------------------------+--------------+-------+-------+-------------------------+-----+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |1    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|1    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|1    |
+--------------------------+--------------+-------+-------+-------------------------+-----+

Q1:
не порядок получения данных, если их значение pro.weight является максимальным и группируется по _id и pro.fieldВ чем проблема с моим запросом.
Q2:
Мне также нужно получить данные в специальном sourceId с назначенными fieldID
, такими как get [age,101,21,0.5] not [age,102,21,0.9] даже его вес ниже 0,9 в этой группе.Причина sourceID == 101 является приоритетной.

if(pro.fieldID == age  && pro.sourceID == 101 ){
   //get this  data   when  the  field  is  `age`  and  the `sourceId`  fitted   get  this data 
   //[age,101,21,0.5]
   // other  field  also get the max  weight
   // group  by  pro.fieldID , sorted  by  pro.weight  and  the  top one
   //[name,100,yyj196,0.8]
   //[favorite,102,IT,0.7]

}else {
  //group  by  pro.fieldID , sorted  by  pro.weight  and  the  top one
  //both  field  also get the max  weight
  //[age,101,21,0.9]
  //[name,100,yyj196,0.8]         
  //[favorite,102,IT,0.7]
}

как это сделать.
Заранее спасибо.

РЕДАКТИРОВАТЬ
Подробнее INFO

 val w = Window.partitionBy(tmp.col("_id"),tmp.col("pro.fieldID")).orderBy(functions.desc("pro.weight"))
    tmp.where("related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").withColumn("rn", functions.row_number().over(w)).show(false)
    println("----------------------")
    tmp.withColumn("rn", functions.row_number().over(w)).where("related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").show(false)

Почему результат отличается? Они используют одни и те же данные, один и тот же «механизм окна»

формат данных

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- relatedID: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- related: integer (nullable = true)

+--------------------------+--------------+-------+-------+-------------------------+---+
|_id                       |relatedID     |related|u      |pro                      |rn |
+--------------------------+--------------+-------+-------+-------------------------+---+
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[age,101,21,0.9]         |1  |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[name,100,yyj196,0.8]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[age,101,21,0.9]         |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |2  |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[favorite,102,IT,0.7]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[favorite,102,IT,0.7]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|2  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[name,100,yyj196,0.8]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|2  |
+--------------------------+--------------+-------+-------+-------------------------+---+

----------------------
19/02/01 18:31:11 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimizations
+--------------------------+--------------+-------+-------+-------------------------+---+
|_id                       |relatedID     |related|u      |pro                      |rn |
+--------------------------+--------------+-------+-------+-------------------------+---+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|1  |
+--------------------------+--------------+-------+-------+-------------------------+---+

1 Ответ

0 голосов
/ 01 февраля 2019

Q1:

Нет гарантии, что выбор строк из упорядоченного представления без order by приведет к упорядоченной таблице.У базы данных SQL есть свобода выбора наиболее подходящего метода (с точки зрения производительности).

В целом, я бы не рекомендовал упорядочивать представление по двум причинам: первая - причина вашей ошибки - вам нужноупорядочивать вещи дважды, поэтому нет смысла, во-вторых, упорядоченная отфильтрованная таблица выполняется быстрее, так как сортируется меньше строк.

Q2:

Если я понимаюправильно, вы хотите обменять некоторые из ваших строк / столбцов.Вы можете заглянуть в withColumn() или просто map() с оператором if внутри, чтобы преобразовать те, которые удовлетворяют некоторому условию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...