scala spark - сопоставление данных на основе переменных дат - PullRequest
0 голосов
/ 28 декабря 2018

Я пытаюсь сопоставить два кадра данных на основе окна переменной даты.Я не просто пытаюсь получить точное совпадение, которого добивается мой код, но чтобы найти всех вероятных кандидатов в окне с переменным днем.

Мне удалось получить точные совпадения по датам с моим кодом.

Но я хочу выяснить, можно ли сопоставить записи, поскольку они могут быть на несколько дней свободны с обеих сторон, но все же достаточно разумны, чтобы присоединиться к ним.

Я пыталсяищу что-то похожее на Python pd.to_timedelta('1 day') в искре, чтобы добавить к фильтру, но увы, не повезло.

Вот мой текущий код, который соответствует кадру данных в столбце ID , а затемзапускает фильтр, чтобы убедиться, что from_date во втором кадре данных находится между start_date и end_date первого кадра данных.

Мне нужно не точное совпадение дат, а возможность сопоставления записей, если они попадают между днем ​​или двумя (с обеих сторон) фактических дат.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val df1 = spark.read.option("header","true")
               .option("inferSchema","true").csv("../data/df1.csv")

val df2 = spark.read.option("header","true")
               .option("inferSchema","true")
               .csv("../data/df2.csv")

val df = df2.join(df1,
                      (df1("ID") === df2("ID")) &&
                      (df2("from_date") >= df1("start_date")) &&
                      (df2("from_date") <= df1("end_date")),"left")
            .select(df1("ID"), df1("start_date"), df1("end_date"), 
                                                  $"from_date", $"to_date")

df.coalesce(1).write.format("com.databricks.spark.csv")
  .option("header", "true").save("../mydata.csv")

По сути, я хочу иметь возможность редактировать это окно даты, чтобы увеличить или уменьшить данные, действительно совпадающие.

Буду очень признателен за ваш вклад.Я новичок в Spark / Scala, но должен сказать, что я люблю это до сих пор ... так намного быстрее (и чище), чем Python!

ура

Ответы [ 2 ]

0 голосов
/ 07 января 2019

Вы можете получить те же результаты, используя функцию datediff ().Проверьте это:

scala> val df1 = Seq((1,  "2018-12-01", "2018-12-05"),(2,  "2018-12-01", "2018-12-06"),(3,  "2018-12-01", "2018-12-07")).toDF("ID", "start_date", "end_date").withColumn("start_date",'start_date.cast("date")).withColumn("end_date",'end_date.cast("date"))
df1: org.apache.spark.sql.DataFrame = [ID: int, start_date: date ... 1 more field]

scala> val df2 = Seq((1,  "2018-11-30"), (2,  "2018-12-08"),(3,  "2018-12-08")).toDF("ID", "from_date").withColumn("from_date",'from_date.cast("date"))
df2: org.apache.spark.sql.DataFrame = [ID: int, from_date: date]

scala> val delta = 1;
delta: Int = 1

scala> df2.join(df1,df1("ID") === df2("ID") && datediff('from_date,'start_date) >= -delta && datediff('from_date,'end_date)<=delta, "leftOuter").show(false)
+---+----------+----+----------+----------+
|ID |from_date |ID  |start_date|end_date  |
+---+----------+----+----------+----------+
|1  |2018-11-30|1   |2018-12-01|2018-12-05|
|2  |2018-12-08|null|null      |null      |
|3  |2018-12-08|3   |2018-12-01|2018-12-07|
+---+----------+----+----------+----------+


scala>
0 голосов
/ 28 декабря 2018

Вы можете применить date_add и date_sub к start_date/end_date в вашем join состоянии, как показано ниже:

import org.apache.spark.sql.functions._
import java.sql.Date

val df1 = Seq(
  (1, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-05")),
  (2, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-06")),
  (3, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-07"))
).toDF("ID", "start_date", "end_date")

val df2 = Seq(
  (1, Date.valueOf("2018-11-30")),
  (2, Date.valueOf("2018-12-08")),
  (3, Date.valueOf("2018-12-08"))
).toDF("ID", "from_date")

val deltaDays = 1

df2.join( df1,
  df1("ID") === df2("ID") &&
  df2("from_date") >= date_sub(df1("start_date"), deltaDays) &&
  df2("from_date") <= date_add(df1("end_date"), deltaDays),
  "left_outer"
).show
// +---+----------+----+----------+----------+
// | ID| from_date|  ID|start_date|  end_date|
// +---+----------+----+----------+----------+
// |  1|2018-11-30|   1|2018-12-01|2018-12-05|
// |  2|2018-12-08|null|      null|      null|
// |  3|2018-12-08|   3|2018-12-01|2018-12-07|
// +---+----------+----+----------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...