Импортируйте данные в RocksDB, используя раздел Spark - PullRequest
0 голосов
/ 27 марта 2020

У меня проблема при использовании спарк-раздела для импорта данных в RocksDB. Я кратко опишу мою проблему следующим образом. У меня есть данные, хранящиеся в виде паркета, я использую spark, чтобы прочитать все эти данные, преобразовав данные каждой строки в avro-записи в виде столбца: соответствующие значения. В то время я использовал writebatch для записи всех вышеуказанных данных в Rocksdb в виде ключа: значение, соответствующее ключу, равно id, значение - это данные, преобразованные в avro. Проблема начинается здесь, общее количество записей о паркете составляет 21 миллион, однако, когда я импортирую в RocksDB, данные счета теряются на 500 тысяч записей соответственно. Поскольку пакет записи в RocksDB - Asyn c, я предполагаю, что при записи в RocksDB есть другой поток, чтобы начать запись, но когда Spark Partition завершил свою работу, поток, записанный в RocksDB, все еще не завершен. После этого Spark Partition отключается, поэтому пакет записи потока в RocksDB также удаляется, поэтому данные теряются. Как я могу контролировать поток для записи в RocksDB в разделе Spark, чтобы не потерять данные?

   val convertUDF: UserDefinedFunction = udf((col: Long) => {
      val packer = MessagePack.newDefaultBufferPacker()
      packer.packLong(col)
      packer.close()
      packer.toByteArray
    })
    var parquetFileDF: DataFrame = spark.read.
      parquet("/media/khanhdv/project/exam_rocksdb/rockdb_import/file/parquet/")
    parquetFileDF = parquetFileDF.withColumn("msg_pack_id",
      convertUDF(col("profile_facebook_id")))
    parquetFileDF.createOrReplaceTempView("sample")
    parquetFileDF = parquetFileDF.withColumn("avro_record",
      to_avro(struct(parquetFileDF.columns.map(column): _*)))
    parquetFileDF.orderBy(org.apache.spark.sql.functions.col("msg_pack_id").desc)

Код импорта данных в RocksDB

    parquetFileDF.repartition(1).rdd.foreachPartition(rows => {
      var options = new Options()
        .setCreateIfMissing(true)
        .setDbLogDir("/tmp")
        .setMaxLogFileSize(4 * SizeUnit.MB)
        .setWalDir("/tmp")
        .setInfoLogLevel(InfoLogLevel.ERROR_LEVEL)
        .setWriteBufferSize(256 * SizeUnit.MB)
        .setMaxWriteBufferNumber(4)
        .setMaxBackgroundCompactions(16)
        .setCompressionType(CompressionType.ZLIB_COMPRESSION)
        .setCompactionStyle(CompactionStyle.UNIVERSAL)
        .prepareForBulkLoad()
      var db = RocksDB.open(options, "db")
      try {
        val batch = new WriteBatch(25000000)
        val writeOptions = new WriteOptions()

        rows.foreach(row => {
          val m = row.getValuesMap(row.schema.fieldNames)
          val packer = MessagePack.newDefaultBufferPacker()
          packer.packString(Json(DefaultFormats).write(m))
          packer.close()
          batch.put(row.getAs("msg_pack_id"),
            row.getAs("avro_record"))
          if (batch.count() == 10000) {
            db.write(writeOptions, batch)
            batch.clear()
          }
        })
        if (batch.count() > 0) {
          db.write(writeOptions, batch)
        }
      }
      catch {
        case x: RocksDBException => {
          println(x)
        }
        case x: Exception => {
          println(x)
        }
      }


    })
...