Разбейте строку, используя разделитель, и используйте ее в операторе in - PullRequest
0 голосов
/ 28 января 2019

У нас есть две таблицы, первая из которых содержит код для каждой задачи в каждом запуске.2-я таблица содержит коды всех задач в каждом прогоне.Как запустить спарк SQL, разделив столбец во 2-й таблице на основе разделителя, и использовать его в операторе IN и IN в первой таблице

Таблицы выглядят следующим образом

Таблица f1

+-------+-----+--+
| runid | tid |  |
+-------+-----+--+
| 1a    | cb4 |  |
| 1a    | hb5 |  |
| 1a    | hb6 |  |
| 1b    | gh6 |  |
| 1b    | gh7 |  |
| 1c    | kl9 |  |
+-------+-----+--+

Таблица f2

+-------+-------------+
| runid |     tid     |
+-------+-------------+
| 1a    | cb4,hb5,hb6 |
| 1b    | gh6,gh7,gh8 |
+-------+-------------+

Я пробовал split, но, похоже, он не работает, и regexp_extract не помогает

select e.* from f1 e inner join
f2 a 
on e.runid=a.runid where e.runid in ('1a',
'1b') and e.tid in (select split(a.tid, '[,]') from f2)

Желаемый результат будет

+-------+-----+
| runid | tid |
+-------+-----+
| 1a    | cb4 |
| 1a    | hb5 |
| 1a    | hb6 |
| 1b    | gh6 |
| 1b    | gh7 |
+-------+-----+

Я новичок в том, что касается искры sql.Любая помощь будет очень признателен

Ответы [ 3 ]

0 голосов
/ 28 января 2019

Вот еще одна версия:

val df3 = df2.flatMap {x => x.getString(1).split(",")
             .map(y => (x.getString(0),y))}.toDF("runid","tid2")
df3.show()

+-----+----+
|runid|tid2|
+-----+----+
|   1a| cb4|
|   1a| hb5|
|   1a| hb6|
|   1b| gh6|
|   1b| gh7|
|   1b| gh8|
+-----+----+

Затем соедините df1 и df3

df1.join(df3, "runid").select($"runid", $"tid").distinct.show(false)

+-----+---+
|runid|tid|
+-----+---+
|1a   |hb5|
|1b   |gh7|
|1b   |gh6|
|1a   |hb6|
|1a   |cb4|
+-----+---+ 
0 голосов
/ 29 января 2019

Загрузка данных в виде плоского файла с разделителями каналов

from pyspark.sql.functions import *
from pyspark.sql.types import *
schema=StructType([StructField("runid",StringType()),StructField("tid",StringType())])
f1=spark.read.format("csv").schema(schema).option("header","true").option("delimiter","|").load("c:/tmp/f1.csv")
f2=spark.read.format("csv").schema(schema).option("header","true").option("delimiter","|").load("c:/tmp/f2.csv")

Разнесение с использованием запятой в качестве разделителя и переименование разнесенного столбца в tid

f2_alter=(f2.withColumn("tid_explode",explode(split(f2.tid,"[,]")))).select("runid",col("tid_explode").alias("tid"))

выполнить объединение по runid и tid

df2=f1.join(f2_alter,["runid","tid"]).show()


+-----+---+
|runid|tid|
+-----+---+
|   1a|cb4|
|   1a|hb5|
|   1a|hb6|
|   1b|gh6|
|   1b|gh7|
+-----+---+
0 голосов
/ 28 января 2019

Используйте lateral view с explode, чтобы получить один тид на строку, а затем используйте его для join.

with exploded_f2 as
(select runid,tid,expl_tid
 from f2
 lateral view explode(split(tid,',')) tbl as expl_tid
) 
select f1.*
from f1
join exploded_f2 f2 on f1.tid = f2.expl_tid
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...