Я запускаю все в IntelliJ. Приложение находится в Scala.
scalaVersion: = "2.12.10" sparkVersion = "2.4.4"
Так что я работаю с NY C данные такси. почти 1,1 миллиарда записей, 120 ГБ данных. Я читаю, удаляю ненужные данные, очищаю и записываю в мета-хранилище кустов в разделах (день, месяц, год) в формате паркета со сжатием по умолчанию. Пока все хорошо, работает довольно быстро в течение пары минут (2-3). Теперь я снова читаю данные из метастаза и выполняю некоторые операции. По сути, я хочу вычислить все поездки из manhattanToJFK, чтобы в итоге было несколько UDF и количество. Это занимает вечность.
Тогда у меня есть небольшой набор данных <1MB данных о погоде. почти 50 столбцов, я делаю из этого новый DF только с 7 столбцами. Нужно найти некоторую корреляцию между поездками и погодой et c. Таким образом, объединение даты и затем корреляционная функция spark.ml для получения корреляционной матрицы. Это также занимает целую вечность. </p>
В общей сложности все это занимает 6 часов, чтобы бежать (я был в 4-5 часов, и я что-то испортил и пришлось перезапустить). Я не могу ждать так долго, чтобы проверить, точен ли он или нет.
Из того, что я читаю и понимаю, паркет должен работать значительно быстрее, чем чтение и работа с CSV, но я нахожу это наоборот. Поэтому я думаю, что делаю что-то не так. Может быть, некоторые настройки или настройки et c? Я начинающий с искрой и изучаю это сам. Так что, если я делаю ошибку, будь добр, потерпи меня. Любая помощь будет очень полезна.
Если я должен опубликовать какое-либо обновление или информацию, дайте мне знать. Я могу отредактировать его и опубликовать.
Спасибо
def analysis() = {
var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
// .cache()
val manhattanTojfkDF = countManhattanToJKF(parquetDF)
findCorrelation(manhattanTojfkDF)
}
def countManhattanToJKF(df:DataFrame):DataFrame = {
var parquetDF = df
// val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
val features = geojson.parseJson.convertTo[FeatureCollection]
val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
val lonlatToZoneID = (longitude: Double, latitude: Double) => {
val feature: Option[Feature] = broadcastFeatures.value.find(f => {
f.geometry.contains(new Point(longitude, latitude))
})
feature.map(f => {
f("location_id").convertTo[String]
}).getOrElse("NA")
}
val latlonToZoneIDUDF = udf(lonlatToZoneID)
parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
.otherwise(parquetDF("pickup_longitude")))
parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
.otherwise(parquetDF("dropoff_longitude")))
val boroughLookupID = (pickupID:String) => {
val feature: Option[Feature] = broadcastFeatures.value.find(f => {
f.properties("location_id").convertTo[String] == pickupID
})
feature.map(f => {
f("borough").convertTo[String]
}).getOrElse("NA")
}
val boroughUDF = udf(boroughLookupID)
parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))
val manhattanToJFK = (borough:String, dropOffID:String) => {
(borough == "Manhattan" && dropOffID == "132")
}
val manhattanToJFKUDF = udf(manhattanToJFK)
parquetDF = parquetDF.withColumn("manhattanToJFK",
manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
val totalRidesFromManhattanTOJFK = filteredDF.count()
println(totalRidesFromManhattanTOJFK)
print(parquetDF.show())
filteredDF
}
def findCorrelation(filteredDF:DataFrame) = {
var weatherDF = SparkObject.spark.read.format("csv")
.option("header", true)
.load(URLs.weatherData:_*)
weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))
val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
.select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
weatherDF("TMAX"), weatherDF("PRCP"))
// .cache()
val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
val cleanedDF = ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
cleanedDF.printSchema()
val assembler = new VectorAssembler()
.setInputCols(cleanedDF.columns)
.setOutputCol("features")
val corrFeatures = assembler.transform(cleanedDF)
val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
println(s"Pearson correlation matrix:\n $coeff1")
val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
println(s"Spearman correlation matrix:\n $coeff2")
}
SparkSession выглядит как
lazy val spark = {
SparkSession
.builder()
.master("local[*]")
.appName("NYCTaxiDataKlarna")
.getOrCreate()
}
, и я передаю -Xms4g -Xmx4g как Параметры виртуальной машины, поэтому она составляет 4,1 4,1 ГБ каждый
РЕДАКТИРОВАТЬ: Итак, я сейчас просто запускаю функцию manhatantojfk с небольшим изменением в конце, в основном сохраняя данные для куста. в следующий раз я могу начать оттуда. и он работает уже почти 5 часов и не заканчивается.
val dw = new DataWriter()
dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
print(parquetDF.count())
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
// val totalRidesFromManhattanTOJFK = filteredDF.count()
// println(totalRidesFromManhattanTOJFK)
// print(parquetDF.show())
// filteredDF