Использование метода Apache Spark RDD map (Java API) для получения неколонарного результата - PullRequest
0 голосов
/ 14 апреля 2020

Обратите внимание: Я верю Я прав, пытаясь использовать метод RDD map здесь, но если есть другой способ сделать sh то, что я ' ищу, у меня все уши!


Совершенно новый для Spark 2.4.x здесь, и с использованием Java ( не Scala) API.

Я пытаюсь обернуть свой мозг вокруг метода RDD map(...), особенно на Datasets и не ограничивается только RDD. Канонический пример его использования из официальных документов :

public class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());

Так что в данном случае кажется, что после создания lines RDD у него есть один столбец (имя которого я не уверен), где каждое значение столбца является отдельной строкой файла, и что каждая строка в СДР также представляет отдельную строку файла. Значение lines - это матрица nx1, где n - количество строк / строк в файле.

Также кажется, что при выполнении функции GetLength она подается в каждую строку. единственный столбец в качестве входной строки и возвращает целое число, представляющее длину строки этой строки в виде нового значения столбца в другом наборе данных, который также равен nx1 (просто содержит информацию о длине строки вместо фактических строк / строк ).

ОК, так что я получаю этот тривиальный пример. Но что, если у нас есть nxm наборы данных, то есть множество строк и множество столбцов, и мы хотим написать функции, которые преобразуют их в другие nxm наборы данных?

Например, допустим, у меня есть следующий «входной» набор данных:

+-------------------------+
| price | color | fizz    |
+-------------------------+
| 2.99  | red   | hallo   |
| 13.94 | blue  | yahtzee |
| 7.19  | red   | trueth  |
...
| 4.15  | green | whatevs |
+-------------------------+

, где price - это числовой тип / тип с плавающей запятой, а color и fizz - строки. Итак, у нас есть набор данных в форме nx3; n строки и всегда 3 столбца в каждой строке.

Как написать функцию карты, которая также возвращает набор данных nx3 с теми же столбцами / именами столбцов / схемой, но разными значениями (на основе функция)?

Например, скажем, я хотел новый набор данных nx3 с той же схемой, но он добавил 2.0 к столбцу price, если значение строки color равно строке "red"?

Следовательно, с использованием произвольного набора данных выше, новый набор данных, выходящий из этой функции карты, будет выглядеть так:

+-------------------------+
| price | color | fizz    |
+-------------------------+
| 4.99  | red   | hallo   |  <== added 2.0 to price since color == "red"
| 13.94 | blue  | yahtzee |
| 9.19  | red   | trueth  |  <== added 2.0 to price since color == "red"
...
| 4.15  | green | whatevs |
+-------------------------+

Я искушен для сделать что-то вроде:

public class UpdatedPriceForRedColors implements Function2<String, Double, Double> {
  public Double call(String color, Double currentPrice) {

    if ("red".equals(color) {
        return currentPrice + 2.0;
    } else {
        return currentPrice;
    }
  }
}

JavaRDD<Double> updatedPrices = myDS.map(new UpdatedPriceForRedColors());

Однако здесь есть несколько проблем:

  1. updatedPrices теперь представляет собой только набор данных nx1, состоящий из правильно рассчитанных цен для каждой строки в myDS, тогда как я хочу что-то с тем же price/color/fizz, которое выглядит как 2-й произвольный набор данных выше
  2. Как UpdatedPriceForRedColors узнает, что его первый строковый аргумент - это столбец color, и не fizz столбец?
  3. Фу Кажется, что nction API имеет значение только от go до Function5 или Function6 (трудно определить, что доступно для Java API и что является эксклюзивным для Scala API). Это означает, что я могу писать только те функции, которые принимают по 5 или 6 аргументов, в то время как у меня могут быть наборы данных с 10+ столбцами в них, и мне вполне может понадобиться большинство этих значений столбцов, «введенных» в функцию, чтобы я мог вычислить возвращаемый результат. значение нового набора данных. Какие варианты у меня есть в этом случае?

1 Ответ

0 голосов
/ 18 апреля 2020

Прежде всего, RDD-типы всегда имеют один столбец , потому что RDD не имеют информации о схеме и, таким образом, вы привязаны к типу T в RDD<T>.

Опция 1 заключается в использовании Function<String,String>, который анализирует String в RDD<String>, выполняет logi c для управления внутренними элементами в строке и возвращает обновленную строку.

Если вы хотите, чтобы ваш RDD имел некоторую информацию о схеме, вы можете использовать RDD<Row>, который позволяет получить доступ к отдельным элементам внутри Row (опция 2) .

import org.apache.spark.sql.Row
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
    @Override
    public Row call(String line) throws Exception {
      String[] parts = line.split("\\t");//tab separated values
      Row row = RowFactory.create(parts[0], parts[1], parts[2]);
      return row;
    }
  });

Теперь вы можете отобразить строки:

RDD<Row> updatedRdd = rddRow.map(new Function<Row, Row>() {
    @Override
    public Row call(Row row) throws Exception {
      Float price = row.get(0);
      String color = row.get(1);
      //Your logic here          
      Row row = RowFactory.create(/* three elements here, or whatever*/);
      return row;
    }
  });

Если вы сделаете go еще один шаг вперед, вы можете использовать истинный набор данных (как объяснено здесь ) и использовать Dataframe / API набора данных (вариант 3) .

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StructType schema = DataTypes.createStructType(
    new StructField[]{
            DataTypes.createStructField("price", FloatType, false),
            DataTypes.createStructField("color", StringType, false),
            DataTypes.createStructField("fizz", StringType, false)
    });


JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
    @Override
    public Row call(String line) throws Exception {
      String[] parts = line.split("\\t");//tab separated values
      Row row = RowFactory.create(parts[0], parts[1], parts[2]);
      return row;
    }
  });

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

Наличие фрейма данных позволяет теперь использовать что-то вроде этого:

DataFrame df2 = df.withColumn("price", 
    when(col("color").equals("red"), col("price").add(2f))
        .otherwise(col("price")));

Отказ от ответственности: я не имею t проверил синтаксис и API java, как я привык к scala.

...