Функциональное программирование в Spark / Scala - PullRequest
0 голосов
/ 22 сентября 2019

Я узнаю больше о Scala и Spark, но застрял на том, как структурировать функцию, когда я использую две таблицы в качестве входных данных.Моя цель - сжать мой код и использовать больше функций.Я застрял на том, как я структурирую функции при использовании двух таблиц, к которым я собираюсь присоединиться.Мой код без функции выглядит следующим образом:

 val spark = SparkSession
.builder()
.master("local[*]")
.appName("XX1")
.getOrCreate()

val df1 = spark.sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.load("C:/Users/YYY/Documents/YYY.csv")
// df1: org.apache.spark.sql.DataFrame = [customerID: int, StoreID: int, FirstName: string, Surname: string, dateofbirth: int]


val df2 = spark.sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.load("C:/Users/XXX/Documents/XXX.csv")


df1.printSchema()
df1.createOrReplaceTempView("customerinfo")
df2.createOrReplaceTempView("customerorders")


def innerjoinA(df1: DataFrame, df2:Dataframe): Array[String]={
val innerjoindf= df1.join(df2,"customerId")
}

innerjoin().show()
}

Мой вопрос: как правильно определить функцию для innerjoinA (и почему?) И как именно я могу вызвать ее позже в программе?И что еще важнее, что еще я могу отформатировать как функцию в этом примере?

Ответы [ 2 ]

0 голосов
/ 23 сентября 2019

вы можете сделать что-то вроде этого.

Создать функцию для создания Spark Session и ReadCSV .Эта функция, если вам нужно поместить ее в другой файл, если она также вызывается другими программами.

Только для объединения, нет необходимости создавать функции.Тем не менее, вы можете создать, чтобы понять бизнес-поток и дать ему правильное имя.

import org.apache.spark.sql.{DataFrame, SparkSession}

def getSparkSession(unit: Unit) : SparkSession = {

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("XX1")
    .getOrCreate()

  spark
}


def readCSV(filePath: String): DataFrame = {
  val df = getSparkSession().sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("inferSchema", "true")
    .load(filePath)
  df
}

def getCustomerDetails(customer: DataFrame, details: DataFrame) : DataFrame = {
  customer.join(details,"customerId")
}

val xxxDF = readCSV("C:/Users/XXX/Documents/XXX.csv")
val yyyDF = readCSV("C:/Users/XXX/Documents/YYY.csv")
getCustomerDetails(xxxDF, yyyDF).show()
0 голосов
/ 22 сентября 2019

Основополагающим условием группировки сложных преобразований и объединений в методах является обоснованность.Только вы знаете, имеет ли смысл в вашем случае использование специального метода innerjoin.

Я обычно определяю их как методы расширения, чтобы я мог связывать их один за другим.

trait/object DataFrameExtensions{
    implicit class JoinDataFrameExtensions(df:DataFrame){
        def innerJoin(df2:DataFrame):DataFrame = df.join(df2, Seq("ColumnName"))
    }
}

А потом вкод импортирует / смешивает нужные мне методы и вызывает их в DataFrame.

originalDataFrame.innerJoin(toBeJoinedDataFrame).show()

Я предпочитаю методы расширения, но вы также можете просто объявить метод DataFrame => DataFrame и использовать его уже в методе .transform.определено в API набора данных.

def innerJoin(df2:DataFrame)(df1:DataFrame):DataFrame = df1.join(df2, Seq("ColumnName"))

val join = innerJoin(tobeJoinedDataFrame) _
originalDataFrame.transform(join).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...