Как передать dataframe в оператор ISIN в spark dataframe - PullRequest
0 голосов
/ 02 октября 2018

Я хочу передать фрейм данных, у которого есть набор значений, в новый запрос, но он не выполнен.

1) Здесь я выбираю конкретный столбец, чтобы в следующем запросе я мог передать его под ISIN

scala> val managerIdDf=finalEmployeesDf.filter($"manager_id"!==0).select($"manager_id").distinct
managerIdDf: org.apache.spark.sql.DataFrame = [manager_id: bigint]

2) Мои образцы данных:

 scala> managerIdDf.show
    +----------+                                                                    
    |manager_id|
    +----------+
    |     67832|
    |     65646|
    |      5646|
    |     67858|
    |     69062|
    |     68319|
    |     66928|
    +----------+

3)Когда я выполняю окончательный запрос, он терпит неудачу:

scala> finalEmployeesDf.filter($"emp_id".isin(managerIdDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.DataFrame [manager_id: bigint]  

Я также пытался преобразовать в List и Seq, но он выдает только ошибку.Как показано ниже, когда я пытаюсь преобразовать в Seq и повторно выполнить запрос, он выдает ошибку:

scala> val seqDf=managerIdDf.collect.toSeq
seqDf: Seq[org.apache.spark.sql.Row] = WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

scala> finalEmployeesDf.filter($"emp_id".isin(seqDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

Я также сослался на этот пост, но тщетно.Этот тип запроса, я пытаюсь это для решения подзапросов в спрей фрейма данных.Кто-нибудь здесь, пожалуйста?

Ответы [ 2 ]

0 голосов
/ 02 октября 2018

Альтернативный подход, использующий кадры данных и временные представления и свободный формат SQL SPARK SQL - не беспокойтесь о логике, это просто соглашение и альтернатива вашему первоначальному подходу - этого должно быть достаточно:

val df2 = Seq(
  ("Peter", "Doe", Seq(("New York", "A000000"), ("Warsaw", null))),
  ("Bob", "Smith", Seq(("Berlin", null))),
  ("John", "Jones", Seq(("Paris", null)))
).toDF("firstname", "lastname", "cities")

df2.createOrReplaceTempView("persons")

val res = spark.sql("""select * 
                         from persons 
                        where firstname
                       not in (select firstname
                                 from persons
                                where lastname <> 'Doe')""")

res.show

или

val list = List("Bob", "Daisy", "Peter")

val res2 = spark.sql("select firstname, lastname from persons")
                .filter($"firstname".isin(list:_*))

res2.show

или

val query = s"select * from persons where firstname in (${list.map ( x => "'" + x + "'").mkString(",") })"
val res3 = spark.sql(query)
res3.show

или

df2.filter($"firstname".isin(list: _*)).show

или

val list2 = df2.select($"firstname").rdd.map(r => r(0).asInstanceOf[String]).collect.toList
df2.filter($"firstname".isin(list2: _*)).show 

В вашем случае конкретно:

val seqDf=managerIdDf.rdd.map(r => r(0).asInstanceOf[Long]).collect.toList 2) 
finalEmployeesDf.filter($"emp_id".isin(seqDf: _)).select("").show
0 голосов
/ 02 октября 2018

Да, вы не можете передать DataFrame в isin.isin требует некоторых значений, по которым он будет фильтроваться.

Если вам нужен пример, вы можете проверить мой ответ здесь

В соответствии с обновлением вопроса вы можете сделатьследующее изменение,

.isin(seqDf) 

на

.isin(seqDf: _*)
...