Как объединить строку со строкой, чтобы создать новый фрейм данных искры, если схема строки неизвестна при кодировании? - PullRequest
0 голосов
/ 15 октября 2019

Я создал функцию, которая принимает строку в качестве входных данных и дает строку в качестве выходных данных. Я планирую применить эту функцию к различным фреймам данных, чьи схемы будут отличаться друг от друга. Эти фреймы данных огромны с миллионами строк в каждом, но каждый фрейм данных имеет определенную схему

Я хочу создать другую функцию, которая будет вызывать первую функцию, объединить строку вывода из функции со строкой, которую она отправила функциии создайте новый фрейм данных, который будет выходом второй функции.

Обе функции будут написаны в среде spark-scala. Я очень новичок в spark-scala и не совсем уверен, как я могу объединить строки в новый фрейм данных

def returnTranformFunctionOutput(inputDataRow: Row, TransformFrame: Array[Row]): String = {
 val resultString = "testdata"
    resultString
  }

  def returnOutputDataframe(inputDataframe: DataFrame, TranformFrame: Array[Row]): DataFrame = {

    val inputSchema = inputDataframe.schema
    val outputSchema =  StructType(StructField("outputVal", StringType, true) :: Nil)
    val final_schema = StructType((inputSchema ++ outputSchema))
    val newDf = inputDataframe.map(row => {
      return Row.merge(row,TransformFunctions.returnTranformFunctionOutput(row,TranformFrame))
    }),final_schema)
    newDf
  }

returnOutputDataframe не компилируется и выдает мне несколько ошибок, включая no implicits found for parameter evidence$6: Encoder[U_] и type mismatch: Required:Row Found:stringпри выполнении Row.merge.

Можно ли объединить строку и строку, чтобы создать новую строку, которую затем можно объединить в новый кадр данных?

Ответы [ 2 ]

1 голос
/ 16 октября 2019

вы пытаетесь вернуть Dataframe в returnOutputDataframe, но при выполнении шага .map будет получено Dataset, и вы также передаете схему вместо кодировщика. Вы можете преобразовать inputDataframe в RDD[Row], отобразить значения, а затем создать DF, используя spark.createDataFrame с новой схемой. см. пример ниже.

  val row1 = RowFactory.create("1","2")
  val schema1 = new StructType()
    .add("c0","string")
    .add("c1","string")

  val row2 = RowFactory.create("A","B")
  val schema2 = new StructType()
    .add("c2","string")
    .add("c3","string")


  val df1 = spark.createDataFrame(sc.parallelize(Seq(row1)),schema1)
  df1.show()

  val rdd = df1.rdd.map(s => Row.merge(s, row2))
  val schema = StructType(schema1 ++ schema2)

  val df = spark.createDataFrame(rdd,schema)
  df.printSchema()
  df.show()

    +---+---+
    | c0| c1|
    +---+---+
    |  1|  2|
    +---+---+

    root
     |-- c0: string (nullable = true)
     |-- c1: string (nullable = true)
     |-- c2: string (nullable = true)
     |-- c3: string (nullable = true)

    +---+---+---+---+
    | c0| c1| c2| c3|
    +---+---+---+---+
    |  1|  2|  A|  B|
    +---+---+---+---+
0 голосов
/ 22 октября 2019

Основываясь на ответе из члебека выше, моя последняя функция была:

def returnOutputDataframe( inputDataframe: DataFrame, TranformFrame: Broadcast[Array[Row]]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema =  StructType(StructField("outputval", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val schemaEncoder = RowEncoder(final_schema)
val outputDf = inputDataframe.map(row =>
  Row.merge(row,RowFactory.create(returnTranformFunctionOutputString(row, TranformFrame))))(schemaEncoder)
outputDf
   }
}

В моих тестах использование inputDataframe.map показалось быстрее, чем inputDataframe.rdd.map, и это избегало необходимости использовать шаг createDataFrame.

...