Обработка ошибок с помощью Try match внутри udf - и запись строки, где это не удалось - PullRequest
0 голосов
/ 27 апреля 2018

Scala версии 2.11 и Spark 2.0.1.

У меня есть датафрейм, где я делаю некоторые операции внутри udf. Я хочу иметь возможность запускать операции и возвращать ошибку только в тех строках, где она не удалась. Я также хотел бы вернуть успех / неудачу в качестве дополнительного поля. Pass / Fail может быть в отдельном столбце.

Вот что я попробовал:

val df = Seq(("as", 1, "df"), ("1", 2, "3")).toDF("a", "b", "c")
val df1 = Seq(("1", 1, "3"), ("1", 2, "3")).toDF("a", "b", "c")

def myUdf = udf((i: String, j: Int, k: Int) => { 
   def test (ii:String, jj:Int, kk:Int): Try[Int] = {
     val q = i.toInt * j * k.toInt
     val m = q * i.toInt
     return (Try(q))
  }
  val q = Try(test(i, j, k)) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
  q
})

# First Example
val df2 = df.withColumn("D", myUdf($"a", $"b", $"c")) <-- This fails

# Second Example 
val df3 = df1.withColumn("D", myUdf($"a", $"b", $"c"))
df3.show
  +---+---+---+----------+
  |  a|  b|  c|         D|
  +---+---+---+----------+
  |  1|  1|  3|Success(3)|
  |  1|  2|  3|Success(6)|
  +---+---+---+----------+

1) Как можно получить значения [0-9] для целочисленного регистра (вместо Success (3) и Success (6) - т.е. удалить Success и скобки - 3 и 6 могут быть символами )? Кроме того, как мне добавить успех / неудачу в каждую строку?

2) Можно ли использовать Try match, чтобы проверить, когда происходит сбой Udf, без обработки ошибок на каждом шаге. Как перейти к следующему вычислению, если оно не выполняется на одном шаге? Примечание. Внутри метода «test» выполняется множество вычислений.

3) Каковы возможные альтернативные методы глобальной проверки udf?

1 Ответ

0 голосов
/ 27 апреля 2018

Вы можете сделать это, используя Try, однако обратите внимание, что Try должен охватывать все тело метода test, а не только применяться к результату (вы также не должны использовать ключевое слово return Вот). После этого используйте match, чтобы получить результат.

def myUdf = udf((i: String, j: Int, k: String) => { 
  def test(ii: String, jj: Int, kk: String): Try[Int] = Try {
    val q = i.toInt * j * k.toInt
    val m = q * i.toInt
    q
  }

  test(i, j, k) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
})

Обратите внимание, что k и kk имеют тип String, поскольку это то, что вы имеете в обоих тестовых фреймах данных. Если вы используете Int и значение столбца не может быть неявно преобразовано (например, "df"), эта строка не будет запускать udf, и вы получите null.

Результат с использованием двух фреймов данных:

+---+---+---+------+
|  a|  b|  c|     D|
+---+---+---+------+
| as|  1| df|Failed|
|  1|  2|  3|     6|
+---+---+---+------+

+---+---+---+---+
|  a|  b|  c|  D|
+---+---+---+---+
|  1|  1|  3|  3|
|  1|  2|  3|  6|
+---+---+---+---+
  1. Как можно видеть, это даст только значения или "Failed", в результате успех удаляется, то есть результат возвращается в виде строки.

  2. При сбое в методе test будет сгенерировано исключение, которое перехватывается Try. Это означает, что метод завершится с ошибкой и не продолжится до конца.

  3. Чтобы найти все строки, которые потерпели неудачу, используйте метод filter: df2.filter($"D" === "Failed").

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...