Какая проблема с потоками в Scala не позволяет эффективно загружать файлы S3? - PullRequest
0 голосов
/ 25 сентября 2018

Моя loadRaw() функция читает S3Stream и вставляет данные в таблицу orderItems.Чтобы проверить метод, я скачал файлы S3, содержащие необработанные данные за 1 день.Данные загружаются, но очень неэффективно, и между ними waiting for child threads.Что я делаю не так в этом коде?-

def loadRaw(file: String, fileHost: String, batchSize: Int, bucket: String): Unit = {
val dataStream = fileHost match {
  case "s3" => S3Stream(s3Client, bucket, file)
  case "local" => LazyFileStream(file)
}
route match {
  case 1 => {
    dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.filter(f => OrderEvent.isValidCreateOrder(f)).map(OrderEvent.orderItemRowsFromCreateOrder(_).flatten.grouped(batchSize))
      try {
        dbRecords.foreach { batchedList =>
          batchedList.foreach { record =>
            try {
              OrderEvent.setColumnValues(orderItemsInsert, record.toMap)
              orderItemsInsert.addBatch()
            } catch {
              case e: Exception =>
                logger.error(s"Error parsing order items. \n - Exception: ${e.getMessage}\n Event: ${eventToString(record.toMap)}")
                ArgosLogger.sendError(ErrorType.PARSING_ERROR, Some(record.toMap), Some(e))
            }
          }
          orderItemsInsert.executeBatch()
          orderItemsInsert.clearBatch()
        }
        connection.commit
      } catch {
        case e: Exception =>
          Try(connection.rollback())
          throw e;
      }
      /* finally {
        connection.commit
        orderItemsInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }*/
    }
  }
  case _ => {}

Здесь orderItemInsert - это уже подготовленный оператор SQL, который выполняется после того, как данные заданы с использованием setColumnValues().

  case "s3load" =>
    val files = if (Cli.s3loader.loadManifest.isEmpty) Cli.s3loader.loadFiles.split(",").toList
    else scala.io.Source.fromFile(Cli.s3loader.loadManifest).getLines.toList
    val fileQueue = new java.util.concurrent.ConcurrentLinkedQueue[String](files)

    val pool = Executors.newFixedThreadPool(Cli.s3loader.parallelism)
    1 to Cli.s3loader.parallelism map {
      index =>
        pool.submit(MemSQLS3Loader(
          Cli.s3loader.memsqlHost,
          Cli.s3loader.targetTable,
          Cli.s3loader.memsqlUserName,
          Cli.s3loader.memsqlPassword,
          Cli.s3loader.memsqlBatchSize,
          Cli.s3loader.bucket,
          Cli.s3loader.route,
          fileQueue)
        )
    }
    while (!fileQueue.isEmpty) {
      logger.info("waiting for child threads")
      Thread.sleep(5000L)
    }
    logger.info("starting pool termination")
    pool.shutdown()
    try {
      pool.awaitTermination(60L, TimeUnit.SECONDS)
      logger.info("pool terminated")
    } catch {
      case e: Throwable =>
        throw (e)
    }

Консоль для большей наглядности - enter image description here

...