Захват и запись строки внутри фрейма данных с использованием строки foreach - PullRequest
0 голосов
/ 29 мая 2019

Попытка перехватить и записать строковое значение после замены содержимого, полученного из определенных полей, в каждой строке кадра данных с использованием scala.Но так как он развернут на кластере, не может захватывать какие-либо записи.Кто-нибудь может предложить решение?

Предполагая, что TEST_DB.finalresult имеет 2 поля input1 и input2:

val finalresult=spark.sql("select * from TEST_DB.finalresult")

finalResult.foreach { row => 
    val param1=row.getAs("input1").asInstanceOf[String]
    val param2=row.getAs("input2").asInstanceOf[String]

    val string = """new values of param1 and param2 are -> """ + param1 + """,""" + param2
    // how to append modified string to csv file continously for each microbatch in hdfs ??
}

1 Ответ

3 голосов
/ 29 мая 2019

В вашем коде вы создаете требуемую переменную string, но она нигде не сохраняется, поэтому вы не можете видеть результат.

Вы можете потенциально в каждом foreach открытии открыть нужный файл CSV и добавить новую строку, но я хотел бы предложить другое решение.

Если можете, старайтесь всегда использовать встроенную функциональность Spark, так как он (как правило) более оптимизирован и лучше обрабатывает нулевые входные данные. Вы можете достичь того же:

import org.apache.spark.sql.functions.{lit, concat, col}

val modifiedFinalResult = finalResult.select(
 concat(
  lit("new values of param1 and param2 are -> "),
  col("input1"),
  lit(","),
  col("input2")
 ).alias("string")
)

В переменной modifiedFinalResult у вас будет искровой фрейм данных с одним столбцом с именем string, который представляет собой тот же вывод, что и ваша переменная string в вашем коде. После этого вы можете напрямую сохранить фрейм данных в виде одного CSV-файла (используя функцию перераспределения):

modifiedFinalResult.repartition(1).write.format("csv").save("path/to/your/csv/output")

PS: также предложение на будущее, старайтесь избегать именования переменных после типов данных.

ОБНОВЛЕНИЕ: Исправлена ​​проблема пустых строк с использованием «concat_ws» вместо concat и coalesce для каждого поля. Кажется, что некоторые значения, которые были нулевыми, преобразовывали всю объединенную строку в нулевое значение после преобразования. Тем не менее, это решение работает сейчас!

...