Итерация строк набора данных Spark и применение операций в Java API - PullRequest
0 голосов
/ 08 апреля 2020

Новый в Spark (2.4.x) и использующий API Java ( не Scala !!!)

У меня есть Dataset что я прочитал из файла CSV. У него есть схема (именованные столбцы), например:

id (integer)  |  name (string)  |  color (string)  |  price (double)  |  enabled (boolean)

Пример строки:

23 | "hotmeatballsoup" | "blue" | 3.95 | true

В много (десятки тысяч) строк набор данных. Я хотел бы написать выражение, используя правильный Java / Spark API, который прокручивает каждую строку и применяет следующие две операции к каждой строке:

  1. Если цена null, по умолчанию 0.00; а затем
  2. Если значение столбца цвета "красный", добавьте 2.55 к цене

Так как я новичок в Spark, я даже не уверен, где начать! Моя лучшая попытка на данный момент определенно ошибочна, но я думаю, что это наименее отправная точка:

Dataset csvData = sparkSession.read()
    .format("csv")
    .load(fileToLoad.getAbsolutePath());

// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);

// now how to loop through rows???
for (Seq<String> row : csvRows) {
    // how apply two operations specified above???
    if (row["price"] == null) {
        row["price"] = 0.00;
    }

    if (row["color"].equals("red")) {
        row["price"] = row["price"] + 2.55;
    }
}

Может ли кто-нибудь помочь мне подтолкнуть меня в правильном направлении?

1 Ответ

1 голос
/ 08 апреля 2020

Вы можете использовать spark sql api для достижения этой цели. Нулевые значения также могут быть заменены значениями, используя .fill() из DataFrameNaFunctions. В противном случае вы можете преобразовать Dataframe в набор данных и выполнить эти шаги в .map, но sql api лучше и эффективнее в этом случае.

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red|  1.0|   true|
| 24|            abc|  red| null|   true|
+---+---------------+-----+-----+-------+

импорт sql функций до объявления класса:

import static org.apache.spark.sql.functions.*;

sql API:

df.select(
        col("id"), col("name"), col("color"),
        when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
        .when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
        .otherwise(col("price")).as("price")
        ,col("enabled")
).show();

или с использованием временного представления и sql запрос:

df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();

вывод:

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red| 3.55|   true|
| 24|            abc|  red| 2.55|   true|
+---+---------------+-----+-----+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...