Изменить значение строки, используя несколько столбцов в Spark DataFrame - PullRequest
0 голосов
/ 05 февраля 2019

Я получил фрейм данных (df) этого формата.

df.show()
********************
X1 | x2  | X3 | ..... | Xn   | id_1 | id_2 | .... id_23
1  |  ok |good|         john | null | null |     |null
2  |rick |good|       | ryan | null | null |     |null
....

Я получил фрейм данных, в котором у меня много столбцов, и фрейм данных называется df.Мне нужно отредактировать столбцы в этом фрейме данных (df).У меня есть 2 карты, m1 (Integer-> Integer) и м2 (Integer-> String) отображения.

Мне нужно просмотреть каждую строку, взять значение столбца X1 и увидеть отображенное значение X1 в m1, которое будет в диапазоне [1,23], пусть оно будет 5, а также найти сопоставленное значениеX1 в м2, который будет что-то вроде X8.Мне нужно добавить значение в столбце X8 к id_5.У меня есть следующий код, но я не могу заставить его работать.

val dfEdited = df.map( (row) => {
  val mapValue = row.getAs("X1")
  row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)
})

1 Ответ

0 голосов
/ 05 февраля 2019

То, что вы делаете в row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue), не имеет смысла.

Прежде всего, вы присваиваете значение результату операции getAs("id_"+m1.get(mapValue)), что дает вам неизменное значение.Во-вторых, вы неправильно используете метод getAs, так как вам нужно указать тип данных, возвращаемый таким методом.

Я не уверен, правильно ли я понял, что вы хотите сделать, я полагаю, выотсутствуют некоторые детали.В любом случае, вот что я получил, и он отлично работает.

Конечно, я прокомментировал каждую строку кода, чтобы вы могли легко ее понять.

// First of all we need to create a case class to wrap the content of each row.
case class Schema(X1: Int, X2: String, X3: String, X4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String])


val dfEdited = ds.map( row => {
  // We use the getInt method to get the value of a field which is expected to be Int
  val mapValue = row.getInt(row.fieldIndex("X1"))

  // fieldIndex gives you the position inside the row fo the field you are looking for. 
  // Regarding m1(mapValue), NullPointer might be thrown if mapValue is not in that Map. 
  // You need to implement mechanisms to deal with it (for example, an if...else clause, or using the method getOrElse)
  val indexToModify = row.fieldIndex("id_" + m1(mapValue)) 

  // We convert the row to a sequence, and pair each element with its index.
  // Then, with the map method we generate a new sequence.
  // We replace the element situated in the position indexToModify.
  // In addition, if there are null values, we have to convert it to an object of type Option.
  // It is necessary for the next step.
  val seq = row.toSeq.zipWithIndex.map(x => if (x._2 == indexToModify) Some(m2(mapValue)) else if(x._1 == null) None else x._1)


  // Finally, you have to create the Schema object by using pattern matching
  seq match {
    case Seq(x1: Int, x2: String, x3: String, x4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String]) => Schema(x1, x2,x3,x4, id_1, id_2, id_3)
  }
})

Некоторые комментарии:

  • Объект ds является набором данных.Наборы данных должны иметь структуру.Вы не можете изменять строки внутри метода карты и возвращать их, потому что Spark не будет знать, изменилась ли структура набора данных.По этой причине я возвращаю объект класса case, поскольку он предоставляет структуру для объекта набора данных.

  • Помните, что у вас могут быть проблемы с нулевыми значениями.Этот код может выдать вам нулевые указатели, если вы не установите механизмы для работы со случаями, в которых, например, значение X1 отсутствует в m1.

Надеюсь, что это работает.

...