Как получить доступ к записям в каждой строке и применить пользовательские функции? - PullRequest
0 голосов
/ 25 июня 2019

Мой ввод был kafka-потоком только с одним значением, разделенным запятыми. Похоже на это.

"идентификатор, страна, метки времени"

Я уже разделил набор данных так, чтобы у меня было что-то вроде следующего структурированного потока

Dataset<Row> words = df
            .selectExpr("CAST (value AS STRING)")
            .as(Encoders.STRING())
            .withColumn("id", split(col("value"), ",").getItem(0))
            .withColumn("country", split(col("value"), ",").getItem(1))
            .withColumn("timestamp", split(col("value"), ",").getItem(2));


+----+---------+----------+
|id  |country  |timestamp |
+----+---------+----------+
|2922|de       |1231231232|
|4195|de       |1231232424|
|6796|fr       |1232412323|
+----+---------+----------+

Теперь у меня есть набор данных с 3 столбцами. Теперь я хочу использовать записи в каждой строке в пользовательской функции, например

Dataset<String> words.map(row -> {
    //do something with every entry of each row e.g.
            Person person = new Person(id, country, timestamp);
            String name = person.getName();
            return name;
    };

В конце я хочу снова выделить строку через запятую.

Ответы [ 2 ]

0 голосов
/ 25 июня 2019

Если у вас есть пользовательская функция, которая недоступна при составлении функций в существующем API-интерфейсе spark [1], то вы можете либо перейти на уровень RDD (как предложено @Ilya), либо использовать UDF [2].

Как правило, я стараюсь использовать функции API spark в кадре данных, когда это возможно, поскольку они, как правило, будут оптимизированы наилучшим образом.

Если это невозможно, я создам UDF:

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

В вашем случае вам нужно передать несколько столбцов в вашу UDF, вы можете передать их через запятую squared(col("col_a"), col("col_b")).

Поскольку вы пишете свой UDF в Scala, он должен быть довольно эффективным, но имейте в виду, что если вы используете Python, в общем случае будет дополнительная задержка из-за перемещения данных между JVM и Python.

[1] https://spark.apache.org/docs/latest/api/scala/index.html#package [2] https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html

0 голосов
/ 25 июня 2019

Фрейм данных имеет схему, поэтому вы не можете просто вызвать функцию карты без определения новой схемы. Вы можете привести к RDD и использовать карту, или использовать карту DF с кодировщиком. Другой вариант - я думаю, что вы можете использовать spark SQL с пользовательскими функциями, вы можете прочитать об этом. Если ваш вариант использования действительно прост, как вы показываете, выполните что-то вроде:

var nameRdd = words.rdd.map(x => {f(x)})

что, кажется, это все, что вам нужно если вам все еще нужен информационный фрейм, вы можете использовать что-то вроде:

val schema = StructType(Seq[StructField](StructField(dataType = StringType, name = s"name")))
val rddToDf = nameRdd.map(name => Row.apply(name))
val df = sparkSession.createDataFrame(rddToDf, schema)

P.S dataframe === набор данных

...