как применять объединения в spark scala, когда у нас есть несколько значений в столбце объединения - PullRequest
0 голосов
/ 03 сентября 2018

У меня есть данные в двух текстовых файлах как

file 1:(patient id,diagnosis code)
+----------+-------+
|patient_id|diag_cd|
+----------+-------+
|         1|  y,t,k|
|         2|  u,t,p|
|         3|  u,t,k|
|         4|  f,o,k|
|         5|  e,o,u|
+----------+-------+

file2(diagnosis code,diagnosis description) Time T1
+-------+---------+
|diag_cd|diag_desc|
+-------+---------+
|      y|      yen|
|      t|      ten|
|      k|      ken|
|      u|      uen|
|      p|      pen|
|      f|      fen|
|      o|      oen|
|      e|      een|
+-------+---------+

данные в файле 2 не являются фиксированными и продолжают изменяться, значит, в любой заданный момент времени диагностический код y может иметь описание диагноза как йена, а в другой момент времени он может иметь описание диагноза как десять. Например ниже

file2 at Time T2
+-------+---------+
|diag_cd|diag_desc|
+-------+---------+
|      y|      ten|
|      t|      yen|
|      k|      uen|
|      u|      oen|
|      p|      ken|
|      f|      pen|
|      o|      een|
|      e|      fen|
+-------+---------+

Я должен прочитать данные этих двух файлов в спарке и требовать только тех пациентов, которым поставлен диагноз uen. это можно сделать с помощью spark sql или scala.

Я пытался прочитать файл1 в spark-shell. Два столбца в file1 разделены каналом.

scala> val tes1 = sc.textFile("file1.txt").map(x => x.split('|')).filter(y => y(1).contains("u")).collect
tes1: Array[Array[String]] = Array(Array(2, u,t,p), Array(3, u,t,k), Array(5, e,o,u))

Но так как код диагностики, относящийся к описанию диагностики, не является постоянным в файле2, поэтому придется использовать условие соединения. Но я не знаю, как применять объединения, когда столбец diag_cd в file1 имеет несколько значений.

любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 03 сентября 2018

Пожалуйста, найдите ответ ниже

// Считать файл1 в кадр данных

val file1DF = spark.read.format("csv").option("delimiter","|")
.option("header",true)
.load("file1PATH")

// Считать файл2 в кадр данных

val file2DF = spark.read.format("csv").option("delimiter","|")
.option("header",true)
.load("file2path")

// получаем кадр данных идентификатора пациента для diag_desc как uen

file1DF.join(file2DF,file1DF.col("diag_cd").contains(file2DF.col("diag_cd")),"inner")
.filter(file2DF.col("diag_desc") === "uen")
.select("patient_id").show

enter image description here

0 голосов
/ 03 сентября 2018
  1. Преобразовать таблицу t1 из формата1 в формат2, используя метод разнесения.

    формат 1:

    file 1:(patient id,diagnosis code)
    +----------+-------+
    |patient_id|diag_cd|
    +----------+-------+
    |         1|  y,t,k|
    |         2|  u,t,p|
    +----------+-------+
    

    до

    file 1:(patient id,diagnosis code)
    +----------+-------+
    |patient_id|diag_cd|
    +----------+-------+
    |         1|  y    |
    |         1|  t    |
    |         1|  k    |
    |         2|  u    |
    |         2|  t    |
    |         2|  p    |
    +----------+-------+
    

    Код:

    scala> val data = Seq("1|y,t,k", "2|u,t,p")
    data: Seq[String] = List(1|y,t,k, 2|u,t,p)
    
    scala> val df1 = sc.parallelize(data).toDF("c1").withColumn("patient_id", split(col("c1"), "\\|").getItem(0)).withColumn("col2", split(col("c1"), "\\|").getItem(1)).select("patient_id", "col2").withColumn("diag_cd", explode(split($"col2", "\\,"))).select("patient_id", "diag_cd")
    df1: org.apache.spark.sql.DataFrame = [patient_id: string, diag_cd: string]
    
    scala> df1.collect()
    res4: Array[org.apache.spark.sql.Row] = Array([1,y], [1,t], [1,k], [2,u], [2,t], [2,p])
    

    Я создал фиктивные данные здесь для иллюстрации. Обратите внимание, как мы взрываем конкретный столбец выше, используя

    scala> val df1 = sc.parallelize(data).toDF("c1").
     | withColumn("patient_id", split(col("c1"), "\\|").getItem(0)).
     | withColumn("col2", split(col("c1"), "\\|").getItem(1)).
     | select("patient_id", "col2").
     | withColumn("diag_cd", explode(split($"col2", "\\,"))).
     | select("patient_id", "diag_cd")
    

    df1: org.apache.spark.sql.DataFrame = [Patient_id: строка, diag_cd: строка]

  2. Теперь вы можете создать df2 для файла 2, используя -

    scala> val df2 = sc.textFile("file2.txt").map(x => (x.split(",")(0),x.split(",")(1))).toDF("diag_cd", "diag_desc")
    df2: org.apache.spark.sql.DataFrame = [diag_cd: string, diag_desc: string]
    
  3. Соедините df1 с df2 и отфильтруйте согласно требованию.

    df1.join(df2, df1.col("diag_cd") === df2.col("diag_cd")).filter(df2.col("diag_desc") === "ten").select(df1.col("patient_id")).collect()
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...