Я пытаюсь создать конвейер данных с использованием spark и scala в Maven Project.
Данные поступают через API в формате .csv
ЗАВИСИМОСТЬ ФАЙЛА POM:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<!-- provided -->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<!-- provided -->
</dependency>
Данные:
{"event":"movie","on":"Hollywood","at":"2019-06-01T18:20:00.560Z","data":{"id":"4965-81dc","location":{"pntin":52.48,"pntot":13.42,"at":"2019-06-01T18:20:00.560Z"}},"Hall_id":"chk-id"}
{"event":"movie","on":"Bollywood","at":"2019-06-01T18:20:00.570Z","data":{"id":"9152c5d8","location":{"pntin":52.51,"pntot":13.44,"at":"2019-06-01T18:20:00.570Z"}},"Hall_id":"chk-id"}
{"event":"movie","on":"Lollywood","at":"2019-06-01T18:20:00.569Z","data":{"id":"3a3eb23a","location":{"pntin":52.49,"pntot":13.45,"at":"2019-06-01T18:20:00.569Z"}},"Hall_id":"chk-id"}
мой подход был
1st i Создали класс case и преобразовали его в RDD и Dataframe.
case class hollywood(
event: String,
on: String,
at: String,
data: String,
organization_id: String
)
case class bollywood(
id: String,
location: String
)
case class lollywood(
pntin: String,
pntot: String,
at: String
)
Теперь реальная проблема состояла в том, как присоединить к фрейму данных, так как нет общего ключа
существует ли какой-либо другой подход или лучший метод для передачи этих данных в информационный фрейм для дальнейшего анализа.
можно ли обойтись без класса дела
искровые библиотеки.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import domain.{DisttravelData, _}
object job {
def main(args: Array[String]): Unit = {
// get spark configuration
val conf = new SparkConf()
.setAppName("Project")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
implicit val sqlContext = new SQLContext(sc)
import org.apache.spark.sql.functions._
import sqlContext.implicits._
Может кто-нибудь, пожалуйста, помогите мне, есть лучший способ создать конвейер.