java.lang.UnsupportedOperationExceptionfieldIndex для строки без схемы не определен: исключение для row.getAs [String] - PullRequest
0 голосов
/ 20 ноября 2018

Следующий код вызывает исключение, вызванное: java.lang.UnsupportedOperationException: fieldIndex для строки без схемы не определено.Это происходит, когда на фрейме данных, который был возвращен после вызова groupByKey и flatMap на фрейме данных с использованием ExpressionEncoder, groupedByKey и flatMap, вызывается.

Логический поток: originalDf-> groupByKey-> flatMap-> groupByKey-> flatMap-> show

   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



  object Test {

    def main(args: Array[String]): Unit = {

      val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
      val session = SparkSession.builder.config("spark.master", "local").getOrCreate
      import session.implicits._
      val dataFrame = values.toDF


      dataFrame.show()
      dataFrame.printSchema()

      val newSchema = StructType(dataFrame.schema.fields
        ++ Array(
        StructField("Count", IntegerType, false)
      )
      )

      val expr = RowEncoder.apply(newSchema)

      val tranform =  dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr)

      tranform.show

      val newSchema1 = StructType(tranform.schema.fields
        ++ Array(
        StructField("Count1", IntegerType, false)
      )
      )
      val expr1 = RowEncoder.apply(newSchema1)
      val tranform2 =  tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr1)

      tranform2.show
    }
}

Ниже приведена трассировка стека

18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Как исправить этот код?

Ответы [ 2 ]

0 голосов
/ 20 января 2019

Решение, полученное от JIRA для проекта Spark: https://issues.apache.org/jira/browse/SPARK-26436

Эта проблема вызвана тем, как вы создаете строку:

listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))

Row.fromSeq создает GenericRow и GenericRow's fieldIndexне реализован, потому что GenericRow не имеет схемы.

Изменение строки для создания GenericRowWithSchema может решить эту проблему:

listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)
0 голосов
/ 26 декабря 2018

Обнаруженной проблемы можно избежать, заменив fieldname версию метода getAs [T] (используется в функции для groupByKey):

groupByKey(row => row.getAs[String]("_1"))

наfield-position версия:

groupByKey(row => row.getAs[String](fieldIndexMap("_1")))

где fieldIndexMap сопоставляет имена полей с соответствующими им индексами полей:

val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap

В качестве примечания, ваша функция для flatMapGroups можно упростить до следующего вида:

val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
  flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
  })(expr1)

Несоответствующее поведение между применением оригинальных groupByKey/flatMapGroups методов к «dataFrame» и «tranform», по-видимому, связано с тем, как методы обрабатывают DataFrame иDataset[Row].

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...