У меня проблема при использовании спарк-раздела для импорта данных в RocksDB. Я кратко опишу мою проблему следующим образом. У меня есть данные, хранящиеся в виде паркета, я использую spark, чтобы прочитать все эти данные, преобразовав данные каждой строки в avro-записи в виде столбца: соответствующие значения. В то время я использовал writebatch для записи всех вышеуказанных данных в Rocksdb в виде ключа: значение, соответствующее ключу, равно id, значение - это данные, преобразованные в avro. Проблема начинается здесь, общее количество записей о паркете составляет 21 миллион, однако, когда я импортирую в RocksDB, данные счета теряются на 500 тысяч записей соответственно. Поскольку пакет записи в RocksDB - Asyn c, я предполагаю, что при записи в RocksDB есть другой поток, чтобы начать запись, но когда Spark Partition завершил свою работу, поток, записанный в RocksDB, все еще не завершен. После этого Spark Partition отключается, поэтому пакет записи потока в RocksDB также удаляется, поэтому данные теряются. Как я могу контролировать поток для записи в RocksDB в разделе Spark, чтобы не потерять данные?
val convertUDF: UserDefinedFunction = udf((col: Long) => {
val packer = MessagePack.newDefaultBufferPacker()
packer.packLong(col)
packer.close()
packer.toByteArray
})
var parquetFileDF: DataFrame = spark.read.
parquet("/media/khanhdv/project/exam_rocksdb/rockdb_import/file/parquet/")
parquetFileDF = parquetFileDF.withColumn("msg_pack_id",
convertUDF(col("profile_facebook_id")))
parquetFileDF.createOrReplaceTempView("sample")
parquetFileDF = parquetFileDF.withColumn("avro_record",
to_avro(struct(parquetFileDF.columns.map(column): _*)))
parquetFileDF.orderBy(org.apache.spark.sql.functions.col("msg_pack_id").desc)
Код импорта данных в RocksDB
parquetFileDF.repartition(1).rdd.foreachPartition(rows => {
var options = new Options()
.setCreateIfMissing(true)
.setDbLogDir("/tmp")
.setMaxLogFileSize(4 * SizeUnit.MB)
.setWalDir("/tmp")
.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL)
.setWriteBufferSize(256 * SizeUnit.MB)
.setMaxWriteBufferNumber(4)
.setMaxBackgroundCompactions(16)
.setCompressionType(CompressionType.ZLIB_COMPRESSION)
.setCompactionStyle(CompactionStyle.UNIVERSAL)
.prepareForBulkLoad()
var db = RocksDB.open(options, "db")
try {
val batch = new WriteBatch(25000000)
val writeOptions = new WriteOptions()
rows.foreach(row => {
val m = row.getValuesMap(row.schema.fieldNames)
val packer = MessagePack.newDefaultBufferPacker()
packer.packString(Json(DefaultFormats).write(m))
packer.close()
batch.put(row.getAs("msg_pack_id"),
row.getAs("avro_record"))
if (batch.count() == 10000) {
db.write(writeOptions, batch)
batch.clear()
}
})
if (batch.count() > 0) {
db.write(writeOptions, batch)
}
}
catch {
case x: RocksDBException => {
println(x)
}
case x: Exception => {
println(x)
}
}
})