Создание новых столбцов по картам в столбце массива внутри кадра данных Spark - PullRequest
1 голос
/ 28 февраля 2020

Я хочу преобразовать данные Spark 2.4, импортированные из файлов AVRO (которые содержат данные отслеживания из Google Analytics).

Интересная часть схемы выглядит следующим образом:

root
 |-- visitorId: long (nullable = true)
 |-- visitNumber: long (nullable = true)
 |-- visitId: long (nullable = true)
 |-- visitStartTime: long (nullable = true)
 |-- date: string (nullable = true)
 |-- hits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hitNumber: long (nullable = true)
 |    |    |-- time: long (nullable = true)
 |    |    |-- hour: long (nullable = true)
 |    |    |-- minute: long (nullable = true)
 |    |    |-- customDimensions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- index: long (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)

Полученный набор данных должен быть почти плоским без глубоко вложенных структур. Массивы типа hits должны иметь свой собственный ряд, что легко достигается функцией explode. Массивы типа hits.customDimensions сложнее. Каждый элемент массива имеет поле index (которое не соответствует позиции массива), и для каждого возможного значения должен быть создан новый столбец. Окончательная схема должна выглядеть следующим образом:

root
 |-- visitorId: long (nullable = true)
 |-- visitNumber: long (nullable = true)
 |-- visitId: long (nullable = true)
 |-- visitStartTime: long (nullable = true)
 |-- hit_date: string (nullable = true)
 |-- hit_hitNumber: long (nullable = true)
 |-- hit_time: long (nullable = true)
 |-- hit_hour: long (nullable = true)
 |-- hit_minute: long (nullable = true)
 |-- hit_customDimension_1: string (nullable = true)
 |-- hit_customDimension_9: string (nullable = true)

В зависимости от фактических индексов, обнаруженных в данных, hit_customDimension_X может встречаться чаще.

Набор данных до сих пор преобразуется следующим образом:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.explode_outer;

public class Flattener {
  public static void main(String[] args) {
    String avroFiles = String.join(",", args); // @TODO: named parameters
    SparkConf conf = new SparkConf().setAppName("Simple Application").set("spark.ui.port", "8080");
    SparkSession spark = SparkSession.builder().appName("Simple Application").config(conf).getOrCreate();
    SQLContext sqlContext = spark.sqlContext();
    Dataset<Row> sessions = spark.read().format("avro").load(avroFiles).limit(1000);

    //explode the hits to more rows, remove original array
    sessions = sessions.withColumn("hit", explode_outer(col("hits"))).drop(col("hits"));

    //sample the distinct indinces
    Dataset<Row> r = result.sample(0.1).select(explode(col("hit.customDimensions"))).select(col("col.index")).distinct();
    List<Long> indices = new LinkedList<Long>();
    r.foreach(dr -> {
        indices.add(dr.getLong(0));
    });
    Iterator<Long> l = indices.iterator();
    // for each found index, extract the array element to its own column
    while (l.hasNext()) {
        Long i = l.next();
        result.withColumn("hit_customDimension" + "_" + i.toString(), array_find("hit.customDimensions", "index", i));
    }

    //TODO: move hit column up one level
}

Проблема в том, что такой функции array_find нет. Я нашел функцию фильтра (см. Раздел о Фильтрация по столбцу Array ), но, похоже, он фильтрует строки, а не элементы массива.

Я думаю, что для этого можно написать UDF, но, насколько я знаю, они могут ухудшить производительность. Производительность очень важна из-за наших изменчивых и больших наборов данных (несколько терабайт). Задачи не выглядят необычно, поэтому мне интересно, есть ли встроенный способ сделать это, я просто пропустил.

1 Ответ

1 голос
/ 28 февраля 2020

Кажется, вы ищете функцию SQL, которая извлекает элементы из массива по заданному индексу.

Эта функция уже присутствует в Spark API, но является своего рода «скрытой», поскольку она реализована не как отдельная функция, а как метод apply в классе Column. Пожалуйста, отметьте scalado c:

  /**
   * Extracts a value or values from a complex type.
   * The following types of extraction are supported:
   * <ul>
   * <li>Given an Array, an integer ordinal can be used to retrieve a single value.</li>
   * <li>Given a Map, a key of the correct type can be used to retrieve an individual value.</li>
   * <li>Given a Struct, a string fieldName can be used to extract that field.</li>
   * <li>Given an Array of Structs, a string fieldName can be used to extract filed
   *    of every struct in that array, and return an Array of fields.</li>
   * </ul>
   * @group expr_ops
   * @since 1.4.0
   */
  def apply(extraction: Any): Column

Итак, я бы предложил заменить ваш array_find(...) на col("hit.customDimensions")(i):

result.withColumn("hit_customDimension" + "_" + i.toString(), col("hit.customDimensions")(i));

[UPD]

Как верно указано в комментариях, customDimensions может быть разреженным массивом, где index не порядковое, а произвольное целое число.

В в этом случае преобразование Array в Map с самого начала выглядит наиболее естественным.

  1. Преобразование Array[Struct[Int, String]] в Map[Int, String]:
result.withColumn("hit_customDimensions_Map", 
  map_from_arrays(col("hit.customDimensions")("index"), col("hit.customDimensions")("value")))

И измените способ переноса customDimensions столбцов:
result.withColumn("hit_customDimension" + "_" + i.toString(), col("hit_customDimensions_Map")(i));
...