Расширение функциональности org.apache.spark.sql.Row: Spark Scala - PullRequest
0 голосов
/ 19 декабря 2018

In Spark Row trait

 /** Returns true if there are any NULL values in this row. */
  def anyNull: Boolean = {
    val len = length
    var i = 0
    while (i < len) {
      if (isNullAt(i)) { return true }
      i += 1
    }
    false
  }

, который может использоваться для оценки любого значения, равного нулю в ряду.Точно так же я хочу оценить любое значение
1) Что является пустой строкой
2) Любая строка, которую я передам?

Есть ли способ сделать это, чтобы расширить функциональность org.apache.spark.sql.Row?

Ответы [ 3 ]

0 голосов
/ 19 декабря 2018

Я пытался выразиться проще:

Определение функции:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> def anyNull(itm:Row, search: String): Boolean = {  if(itm.toSeq.contains(search)) (true) else (false) }
anyNull: (itm: org.apache.spark.sql.Row, search: String)Boolean

Использование для СДР [Строка]:

scala> val rdd1 = sc.parallelize(Seq(Row("1","a","A"),Row("2","b", "B"),Row("3","c","C")))
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[130] at parallelize at <console>:25

scala> rdd1.foreach(println)
[1,a,A]
[2,b,B]
[3,c,C]

scala> rdd1.map(r => (r, anyNull(r,"b"))).foreach(println)
([2,b,B],true)
([3,c,C],false)
([1,a,A],false)

scala> rdd1.map(r => (r, anyNull(r,"a"))).foreach(println)
([1,a,A],true)
([3,c,C],false)
([2,b,B],false)

scala> rdd1.map(r => (r, anyNull(r,""))).foreach(println)
([3,c,C],false)
([2,b,B],false)
([1,a,A],false)

Использование для DF:

scala> val df = sc.parallelize(Seq(("1","a","A"),("2","b", "B"),("3","c","C"))).toDF("num", "smallcase", "uppercase")
df: org.apache.spark.sql.DataFrame = [num: string, smallcase: string ... 1 more field]

scala> df.show()
+---+---------+---------+
|num|smallcase|uppercase|
+---+---------+---------+
|  1|        a|        A|
|  2|        b|        B|
|  3|        c|        C|
+---+---------+---------+


scala> df.rdd.map(r => Row(r(0), r(1), r(2))).map(r => (r, anyNull(r,"b"))).foreach(println)
([2,b,B],true)
([1,a,A],false)
([3,c,C],false)

scala> df.rdd.map(r => Row(r(0), r(1), r(2))).map(r => (r, anyNull(r,""))).foreach(println)
([2,b,B],false)
([1,a,A],false)
([3,c,C],false)
0 голосов
/ 19 декабря 2018

Есть ли способ сделать это, чтобы расширить функциональность org.apache.spark.sql.Row?

Вы можете использовать класс scala implicit, т.е. RowExt какthis.

package utility

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

import scala.util.{Failure, Success, Try}

object MyRowExtensions {
  /**
    * RowExt.
    * @param row Row
    */
  implicit class RowExt(row: Row) {
    /**
      * anyEmptyString
      * @return Boolean
      */
    def anyEmptyString(): Boolean = {
      val len = row.length
      var i = 0
      while (i < len) {
        if (row.get(i) != null && StringUtils.isEmpty(row.get(i).toString)) {
          return true
        }
        i += 1
      }
      false
    }

    /**
      * anyStringEqual
      * @param matchStr String
      * @return Boolean
      */
    def anyStringEqual(matchStr: String): Boolean = {
      val len = row.length
      var i = 0
      while (i < len) {
        if (row.get(i) != null && row.get(i).toString == matchStr) {
          return true
        }
        i += 1
      }
      false
    }

    /**
      * anyStringMatched
      *
      * @param matchStr String
      * @return Boolean
      */
    def anyStringMatched(matchStr: String): Boolean = {
      val len = row.length
      var i = 0
      while (i < len) {
        if (row.get(i) != null && StringUtils.containsAny(row.get(i).toString, matchStr)) {
          return true
        }
        i += 1
      }
      false
    }

    /**
      * getAsOption
      * @param fieldName
      * @tparam T
      * @return Option[T]
      */
    def getAsOption[T](fieldName: String): Option[T] = {
      Try(row.getAs[T](fieldName)) match {
        case Success(value) => Some(value)
        case Failure(e) => None
      }
    }
  }
}

абонент будет выглядеть как ...

import utility.MyRowExtensions.RowExt

myDataFrame.map(r => r.anyStringMatched("myteststring")).show
myDataFrame.filter(_.anyEmptyString).show
0 голосов
/ 19 декабря 2018

Следуя не столь идиоматическому стилю, используемому в методе trait Row anyNull, вот метод для проверки, равен ли какой-либо элемент StringType в Row входной строке:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

def checkStringsInRow(row: Row, s: String): Boolean = {
  val stringFields = row.schema.zipWithIndex.
    collect{ case (field, idx) if field.dataType == StringType => idx }

  var i = 0
  while (i < stringFields.size) {
    if (row.getAs[String](stringFields(i)) == s) return true
    i += 1
  }
  false
}

Тестирование метода:

val df = Seq(
  (1, "a", 10L, "xx"),
  (2, "b", 10L, ""),
  (3, null, 10L, "zz")
).toDF("c1", "c2", "c3", "c4")

df.rdd.map(checkStringsInRow(_, "a")).collect
// res1: Array[Boolean] = Array(true, false, false)

df.rdd.map(checkStringsInRow(_, "")).collect
// res2: Array[Boolean] = Array(false, true, false)

df.rdd.map(checkStringsInRow(_, null)).collect
// res3: Array[Boolean] = Array(false, false, true)

Если вы предпочитаете использовать метод как Row метод, вы можете определить его в implicit class, как показано ниже:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

implicit class RowOps(row: Row) {
  def checkStringsInRow(s: String): Boolean = {
    val stringFields = row.schema.zipWithIndex.
      collect{ case (field, idx) if field.dataType == StringType => idx }

    var i = 0
    while (i < stringFields.size) {
      if (row.getAs[String](stringFields(i)) == s) return true
      i += 1
    }
    false
  }
}

df.rdd.map(_.checkStringsInRow("a")).collect
// res4: Array[Boolean] = Array(true, false, false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...