прочитать большой CSV-файл и разделить его в соответствии с условиями, используя scala / spark - PullRequest
0 голосов
/ 24 марта 2020

Я новичок в scala / spark и не знаю, как задать такой вопрос (техническое слово ...). У меня есть большой CSV-файл, я хочу прочитать его в кадре данных и распределить его по нескольким блокам в соответствии с условием для столбцов и применить необходимую обработку к каждому из блоков.

Пример мой CSV-файл

VehicleID         Longitude    Latitude     Date
 12311            55.55431     25.45631     01/02/2020
 12311            55.55432     25.45634     01/02/2020
 12311            55.55433     25.45637     02/02/2020
 12311            55.55431     25.45621     02/02/2020
 12309            55.55427     25.45627     01/02/2020
 12309            55.55436     25.45655     02/02/2020
 12412            55.55441     25.45657     01/02/2020
 12412            55.55442     25.45656     02/02/2020

Один из заголовков столбца - VehicleID и Date. Я хотел бы разбить большой CSV на несколько блоков, чтобы в каждом блоке были данные, принадлежащие одному уникальному значению VehicleID и Date.

Пример:

 Bock 1
 VehicleID         Longitude    Latitude     Date
  12311            55.55431     25.45631     01/02/2020
  12311            55.55432     25.45634     01/02/2020

 Block2
 VehicleID        Longitude    Latitude     Date
  12311            55.55433     25.45637     02/02/2020
  12311            55.55431     25.45621     02/02/2020

 Block3
 VehicleID        Longitude    Latitude     Date
 12309            55.55427     25.45627     01/02/2020

 Block4
 VehicleID        Longitude    Latitude     Date
 12309            55.55427     25.45627     02/02/2020

Я хочу также применить эту функцию к каждому блоку

def haversine_distance(longitude1 : Double,longitude2 : Double,latitude1 : Double,latitude2 : Double) : Double= {

  val R = 6372.8
  val dlat = math.toRadians(latitude2 - latitude1)
  val dlog = math.toRadians(longitude2 - longitude1)
  val a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.toRadians(latitude1)) * math.cos(math.toRadians(latitude2)) * math.sin(dlog / 2) * math.sin(dlog / 2)
  val c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  val distance = R * c
  distance
}

Как это сделать с scala?

Спасибо.

1 Ответ

0 голосов
/ 24 марта 2020

Ваша функция необходима для использования 4 входных переменных, поэтому ваш фрейм данных также должен иметь эти переменные для вычисления. Я думаю, что это может быть достигнуто с помощью функций Window и lag.

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("VehicleID", "Date").orderBy("id")

val df2 = df.withColumn("id", monotonically_increasing_id)
  .withColumn("Longitude2", lag("Longitude", 1).over(w))
  .withColumn("Latitude2", lag("Latitude", 1).over(w))
  .orderBy("id")
df2.show(false)

Результат:

+---------+---------+--------+----------+---+----------+---------+
|VehicleID|Longitude|Latitude|Date      |id |Longitude2|Latitude2|
+---------+---------+--------+----------+---+----------+---------+
|12311    |55.55431 |25.45631|01/02/2020|0  |null      |null     |
|12311    |55.55432 |25.45634|01/02/2020|1  |55.55431  |25.45631 |
|12311    |55.55433 |25.45637|02/02/2020|2  |null      |null     |
|12311    |55.55431 |25.45621|02/02/2020|3  |55.55433  |25.45637 |
|12309    |55.55427 |25.45627|01/02/2020|4  |null      |null     |
|12309    |55.55436 |25.45655|02/02/2020|5  |null      |null     |
|12412    |55.55441 |25.45657|01/02/2020|6  |null      |null     |
|12412    |55.55442 |25.45656|02/02/2020|7  |null      |null     |
+---------+---------+--------+----------+---+----------+---------+

Затем зарегистрируйте свою функцию как пользовательскую функцию, например as

def haversine_distance(longitude1 : Double,longitude2 : Double,latitude1 : Double,latitude2 : Double) : Double= {

  val R = 6372.8
  val dlat = math.toRadians(latitude2 - latitude1)
  val dlog = math.toRadians(longitude2 - longitude1)
  val a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.toRadians(latitude1)) * math.cos(math.toRadians(latitude2)) * math.sin(dlog / 2) * math.sin(dlog / 2)
  val c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  val distance = R * c
  distance
}

spark.udf.register("haversine_distance", haversine_distance(_: Double, _: Double, _: Double, _: Double): Double)

Наконец, вы можете использовать эту функцию в искре SQL:

df2.withColumn("haversine_distance", expr("haversine_distance(Longitude, Longitude2, Latitude, Latitude2)"))
   .show(false)

, которая дает конечный результат:

+---------+---------+--------+----------+---+----------+---------+---------------------+
|VehicleID|Longitude|Latitude|Date      |id |Longitude2|Latitude2|haversine_distance   |
+---------+---------+--------+----------+---+----------+---------+---------------------+
|12311    |55.55431 |25.45631|01/02/2020|0  |null      |null     |null                 |
|12311    |55.55432 |25.45634|01/02/2020|1  |55.55431  |25.45631 |0.0034846437813896825|
|12311    |55.55433 |25.45637|02/02/2020|2  |null      |null     |null                 |
|12311    |55.55431 |25.45621|02/02/2020|3  |55.55433  |25.45637 |0.017909203100004076 |
|12309    |55.55427 |25.45627|01/02/2020|4  |null      |null     |null                 |
|12309    |55.55436 |25.45655|02/02/2020|5  |null      |null     |null                 |
|12412    |55.55441 |25.45657|01/02/2020|6  |null      |null     |null                 |
|12412    |55.55442 |25.45656|02/02/2020|7  |null      |null     |null                 |
+---------+---------+--------+----------+---+----------+---------+---------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...