java .io.InterruptedIOException при вызове next () над объектом ResultScanner - PullRequest
1 голос
/ 26 января 2020

Я пытаюсь прочитать и проанализировать сканер ResultScanner, но при вызове next() я получаю исключение.

Это соответствующий фрагмент моего кода:

    var scan: Scan = new Scan()
    val keyRegEx : RegexStringComparator = new RegexStringComparator("^.*"+"123123123123")
    val rowFilter : RowFilter = new RowFilter(CompareOp.EQUAL, keyRegEx)

    scan.setFilter(rowFilter)
    scan.setCaching(3000)

    // Apply the scan to the Table
    val scanner = table.getScanner(scan)

    val scanOutput: Seq[(String, String)] = iterateScannerAddingRowkey[T](scanner, Seq())

  def iterateScannerAddingRowkey[T](scanner: ResultScanner, acc: Seq[(String,String)])(implicit m: Manifest[T]) : Seq[(String,String)] = {
    // **Line below is triggering the exception**
    val result = scanner.next()
    if (result == null) acc
    else {
      val rowKey = result.rawCells().head.toString.split("/")(0)
      // Parsing the rawCells content into a JSONObject
      val response : JSONObject = getJson[T](result.rawCells())
      iterateScannerAddingRowkey[T](scanner, Seq((rowKey, response.toString)) ++ acc)
    }
  }

И это исключение:

java .lang.RuntimeException: java .io.InterruptedIOException at org. apache .had oop .hbase.client.AbstractClientScanner $ 1.hasNext ( AbstractClientScanner. java: 97) на com.myproject.framework.hbase.HBaseUtils.iterateScannerAddingRowkey (HBaseUtils. scala: 85) на com.myproject.framework.hbase.HBaseAPI.hbaseGetRowByRegEx * 10) H). в com.myproject.core.ComparePrefixVsRegex $ .main (приложение. scala: 46) в com.myproject.core.ComparePrefixVsRegex.main (приложение. scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (родной метод) sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl. java: 62) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) в java. *: 498) в орг. apache .spark.de ploy.yarn.ApplicationMaster $$ anon $ 3.run (ApplicationMaster. scala: 686) Вызывается: java .io.InterruptedIOException в org. apache .had oop .hbase.client.ScannerCallableWithReplicas.call ( ScannerCallableWithReplicas. java: 203) в орг. apache .had oop .hbase.client.ScannerCallableWithReplicas.call (ScannerCallableWithReplicas. java: 61) в орг. apache .hb * 1042. client.RpcRetringCaller.callWithoutRetries (RpcRetringCaller. java: 200) в org. apache .had oop .hbase.client.ClientScanner.call (ClientScanner. java: 320) в org. apache .had oop .hbase.client.ClientScanner.loadCache (ClientScanner. java: 401) в орг. apache .had oop .hbase.client.ClientScanner.next (ClientScanner. java: 364) в орг. . apache .had oop .hbase.client.AbstractClientScanner $ 1.hasNext (AbstractClientScanner. java: 94)

Как я понимаю, в этом случае scanner было пустым next() вернется null

Кто-нибудь знает, что мне не хватает?

1 Ответ

1 голос
/ 27 января 2020

После нескольких попыток я смог решить проблему.

Основной причиной было то, что размер запрашиваемой таблицы действительно велик, поэтому при обработке сканера я достиг тайм-аута. Чтобы решить эту проблему, я сделал два изменения:

Я увеличил время ожидания искрового вещания

val spark = SparkSession
              .builder
              .config("spark.sql.broadcastTimeout", "36000")
              .getOrCreate()

И добавил в скан выбор столбцов, которые хочу прочитать, чтобы уменьшить размер результат:

scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C1"))
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C2"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...