Как сгруппировать Spark DataFrame с моими компараторами равенства? - PullRequest
3 голосов
/ 13 марта 2019

Я хотел бы использовать оператор GroupBy в DataFrame с моими собственными компараторами равенства.

Давайте предположим, что я хочу выполнить что-то вроде:

df.groupBy("Year","Month").sum("Counter")

В этом фрейме данных:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

Я должен реализовать два компаратора:

1) Для колонки Year: p.e. "2012" == "12"

2) Для столбца Месяц: п.е. "Ян" == "Январь" == "Яну"

Давайте предположим, что я уже реализовал эти два компаратора. Как я могу их вызвать? Как и в этом примере, я уже знаю, что мне нужно преобразовать мой DataFrame в RDD, чтобы можно было использовать мои компараторы.

Я думал об использовании RDD GroupBy .

Обратите внимание, что Мне действительно нужно сделать это с помощью компараторов . Я не могу использовать пользовательские функции, изменять данные или создавать новые столбцы. Будущая идея состоит в том, чтобы иметь столбцы зашифрованного текста, в которых у меня есть функции, которые позволяют мне сравнивать, совпадают ли два зашифрованных текста. Я хочу использовать их в моих компараторах.

Edit:

В данный момент я пытаюсь сделать это только с одним столбцом, например:

df.groupBy("Year").sum("Counter")

У меня есть класс Wrapper:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

Затем я делаю это:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

Мой вопрос здесь состоит в том, как сделать «сумму» и как использовать keyBy с несколькими столбцами, чтобы использовать ExampleWrapperYear и ExampleWrapperMonth.

Ответы [ 2 ]

1 голос
/ 15 марта 2019

Это решение должно работать. Вот классы case (мы можем назвать их как компараторы), которые реализуют hashCode и равны.

Вы можете изменять / обновлять hashCode и равные на основе разных шифротекстов

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

Вот важный компаратор для сгруппированных ключей, который просто использует отдельный компаратор цвета

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

, что дает

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

Обратите внимание, что для классов дел Год и Месяц также обновлено значение до стандартного значения (в противном случае непредсказуемо, какое значение оно выберет).

1 голос
/ 13 марта 2019

Вы можете использовать udfs для реализации логики, чтобы сделать ее стандартным форматом год / месяц

  def toYear : (Integer) => Integer = (year:Integer)=>{
    2000 + year % 100 //assuming all years in 2000-2999 range
  }

  def toMonth : (String) => String = (month:String)=>{
    month match {
      case "January"=> "Jan"
      case "Janu"=> "Jan"
      case "February" => "Feb"
      case "Febr" => "Feb"
      case _ => month
    }
  }

  val toYearUdf = udf(toYear)
  val toMonthUdf = udf(toMonth)

  df.groupBy( toYearUdf(col("Year")), toMonthUdf(col("Month"))).sum("Counter").show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...