Как реализовать следующий фрагмент кода scala в Java - PullRequest
0 голосов
/ 03 апреля 2019

Я реализую код для динамического добавления нескольких столбцов в Dataframe с нулевыми значениями в строке

Я нашел следующий фрагмент кода в scala, где используется функция карты объекта Dataframe.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, "string", false))
  schema
}

Я реализовал следующие две функции в java

 private StructType getSchema(Dataset<Row> df, List<String> cols){
        StructType schema = df.schema();
        cols.forEach(col -> schema.add(col, "int", true));
        return schema;
    }

private addColumnsViaMap(Dataset<Row> df, List<String> cols){
    Encoder<Row> encoder1 = 
  RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
   df.map(new MapFunction<Set<String>, Row>() {
                private static final long serialVersionUID = 1L;

                @Override
                public Row call(Set<String> cols) throws Exception {
                    // TODO Auto-generated method stub
                }
            }, encoder1);
}

В методе addColumnsViaMap есть ошибка компиляции, не удается разрешить метод функции анонимной карты из-за несоответствия параметров.

и я не понимаюscala-код mappingRows, особенно следующий StructType => List[String] => Row => Row = (schema) => (words) => (row), что это значит ??

и как реализовать вышеупомянутый scala-код в Java?

Ответы [ 2 ]

1 голос
/ 03 апреля 2019

Ну, это объявление немного сложное (и IMO тоже немного нечитаемое), поэтому давайте вернемся назад.

В scala String, List ... - это типы, о которых все знают.Вы можете создать переменную типа String.

. То, что вы также можете сделать, это присвоить функцию переменной (это функциональная ориентация scala), поэтому функции также имеют типы.Например, если у вас есть функция, которая принимает List и выводит String, она имеет тип List => String.

И это выглядит в коде?

// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString

Но у нас есть более короткая запись для объявления функций, мы можем объявить их «встроенными», без использования оператора def.Таким образом, приведенный выше код может быть эквивалентно записан:

val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")

Итак, в общем случае:

  • A => B - это тип функции, котораяпринимает A и возвращает B
  • (arg: A) => { new B() } - фактическую функцию, которая принимает в качестве входных данных экземпляр A (экземпляр связан с именем переменной arg и тело которого возвращаетэкземпляр B

Теперь давайте сделаем что-нибудь сумасшедшее, давайте ... начнем все сначала. Скажем, что F - это функция, которая принимает List и возвращает String.функция, которая принимает Int и возвращает F как выглядит?

Ну, это было бы:

  • Int => F.
  • То есть: Int => (List => String)
  • Что можно написать Int => List => String

А как вы это объявляете?

// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")

Здесь intToListToString - это функциякоторый принимает int и возвращает «функцию, которая принимает List и возвращает String».

И вы можете вкладывать снова и снова.

Пока не получите:StructType => List[String] => Row => Row which - это тип, который означает «функцию, которая принимает StructType в качестве ввода и возвращает (функция, которая принимает List[String] в качестве ввода и возвращает (функция, которая принимает Row в качестве ввода и возвращает строку)).

И вы могли бы реализовать это как:

(schema) => // a function that takes schema, and returns
    (words) => // a function that takes a list of words and returns
        (row) => // a function that takes a row and returns
            Row.fromSeq(...) // another row

Теперь, как это будет выглядеть в Java?

Если вы хотите преобразовать это строго как есть, вы можетеПодумайте об этом так: естественным эквивалентом A => B Скалы является java.util.Function<A, B>.Кроме того, если вы хотите использовать функцию для выполнения операции Spark map на Dataframe, вы должны использовать MapFunction<>.

Так что мы ищем реализацию Function<Schema, Function<List<String>, MapFunction<Row, Row>>> или что-то в этом роде.

Используя лямбда-нотацию Java, вы можете сделать это следующим образом:

schema ->  words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))

Это функция, которая принимает схему,

  • , который возвращает функцию, которая принимает список слов

    • , который возвращает функцию, которая принимает строку

      • , которая возвращает строку, дополненнуюстолбцы, содержащие ноль

Возможно, мой синтаксис Java правильный, а может и нет, я не знаю.

Что я знаю, так это то, чтоэто слишком сложный способ выполнения ваших требований.

Что это за требование: у вас есть фрейм данных, у вас есть список слов, вы хотите создать новые столбцы с этим именем и содержащие ноль.

Итак, что бы я сделал в scala:

import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))

val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show

+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
|      a|      b|   null|   null|
|      c|      d|   null|   null|
+-------+-------+-------+-------+

Что вы, вероятно, можете написать вjava как таковой

DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
    for (String word: words) {
        dataframe = dataframe.withColumn(word, lit((String) null))
    }
    return dataframe;
}

Еще раз, у меня нет искровой среды на основе Java, но моя точка зрения такова: если вы поймете принцип, переписать просто.

0 голосов
/ 03 апреля 2019
private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

Проще говоря, это можно прочитать как:

mappingRows - это «функция», которая принимает 3 параметра (типов StructType, List и Row, скажем, схема, слова и строки) и возвращает Row. Но вместо того, чтобы называть это так:

mappingRows(schema, words, row)`

ты пойдешь

mappingRows(schema)(words)(row)

Это значит, что звонить просто

mappingRows(schema)(words)

вернет функцию, которая принимает Row и возвращает Row: функцию отображения, которую вы можете передать типичной функции .map().

По сути, учитывая схему и список имен столбцов, замыкание принимает строку в качестве ввода. Он просто добавляет пустой столбец в эту строку для каждого заданного имени столбца.

Помогает ли вам ответить на ваш вопрос?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...