Я хочу преобразовать данные Spark 2.4, импортированные из файлов AVRO (которые содержат данные отслеживания из Google Analytics).
Интересная часть схемы выглядит следующим образом:
root
|-- visitorId: long (nullable = true)
|-- visitNumber: long (nullable = true)
|-- visitId: long (nullable = true)
|-- visitStartTime: long (nullable = true)
|-- date: string (nullable = true)
|-- hits: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- hitNumber: long (nullable = true)
| | |-- time: long (nullable = true)
| | |-- hour: long (nullable = true)
| | |-- minute: long (nullable = true)
| | |-- customDimensions: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- index: long (nullable = true)
| | | | |-- value: string (nullable = true)
Полученный набор данных должен быть почти плоским без глубоко вложенных структур. Массивы типа hits
должны иметь свой собственный ряд, что легко достигается функцией explode
. Массивы типа hits.customDimensions
сложнее. Каждый элемент массива имеет поле index
(которое не соответствует позиции массива), и для каждого возможного значения должен быть создан новый столбец. Окончательная схема должна выглядеть следующим образом:
root
|-- visitorId: long (nullable = true)
|-- visitNumber: long (nullable = true)
|-- visitId: long (nullable = true)
|-- visitStartTime: long (nullable = true)
|-- hit_date: string (nullable = true)
|-- hit_hitNumber: long (nullable = true)
|-- hit_time: long (nullable = true)
|-- hit_hour: long (nullable = true)
|-- hit_minute: long (nullable = true)
|-- hit_customDimension_1: string (nullable = true)
|-- hit_customDimension_9: string (nullable = true)
В зависимости от фактических индексов, обнаруженных в данных, hit_customDimension_X
может встречаться чаще.
Набор данных до сих пор преобразуется следующим образом:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.explode_outer;
public class Flattener {
public static void main(String[] args) {
String avroFiles = String.join(",", args); // @TODO: named parameters
SparkConf conf = new SparkConf().setAppName("Simple Application").set("spark.ui.port", "8080");
SparkSession spark = SparkSession.builder().appName("Simple Application").config(conf).getOrCreate();
SQLContext sqlContext = spark.sqlContext();
Dataset<Row> sessions = spark.read().format("avro").load(avroFiles).limit(1000);
//explode the hits to more rows, remove original array
sessions = sessions.withColumn("hit", explode_outer(col("hits"))).drop(col("hits"));
//sample the distinct indinces
Dataset<Row> r = result.sample(0.1).select(explode(col("hit.customDimensions"))).select(col("col.index")).distinct();
List<Long> indices = new LinkedList<Long>();
r.foreach(dr -> {
indices.add(dr.getLong(0));
});
Iterator<Long> l = indices.iterator();
// for each found index, extract the array element to its own column
while (l.hasNext()) {
Long i = l.next();
result.withColumn("hit_customDimension" + "_" + i.toString(), array_find("hit.customDimensions", "index", i));
}
//TODO: move hit column up one level
}
Проблема в том, что такой функции array_find
нет. Я нашел функцию фильтра (см. Раздел о Фильтрация по столбцу Array ), но, похоже, он фильтрует строки, а не элементы массива.
Я думаю, что для этого можно написать UDF, но, насколько я знаю, они могут ухудшить производительность. Производительность очень важна из-за наших изменчивых и больших наборов данных (несколько терабайт). Задачи не выглядят необычно, поэтому мне интересно, есть ли встроенный способ сделать это, я просто пропустил.