Я использую API-интерфейс Kafka Consumer для копирования всех данных из раздела Kafka в таблицу Hive. Для этого я использую HDFS в качестве промежуточного шага. Я использую уникальный идентификатор группы и сбрасываю смещение на «самое раннее», чтобы получить все данные с начала и игнорировать коммиты после выполнения. Затем я перебираю записи в теме Кафки и сохраняю каждую запись во временный файл в HDFS. Затем я использую Spark для чтения данных из HDFS и сохраняю их в файл Parquet, используя дату в качестве имени файла.
Затем я создаю раздел с датой в таблице Hive и, наконец, загружаю файл в Parquet как раздел в Hive.
Как вы можете видеть из кода ниже, я использую несколько промежуточных шагов, что делает мой код далеко не оптимальным. Это самый рекомендуемый способ скопировать все данные из темы Кафки? Я провел некоторое исследование, и до сих пор это был обходной путь, который мне удалось получить к работе, однако, поскольку количество записей увеличивается с каждым днем, мое время выполнения достигает предела допустимого (с 20 минут до 6 часов за 2 недель).
Код здесь:
def start( lowerDate: String, upperDate: String )={
// Configurations for kafka consumer
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
// Crea la sesion de Spark
val spark = SparkSession
.builder()
.master("yarn")
.appName("ParaTiUserXY")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "ParaTiUserXYZZ12345")
//Schema para transformar los valores del topico de Kafka a JSON
val my_schema = new StructType()
.add("longitudCliente", StringType)
.add("latitudCliente", StringType)
.add("dni", StringType)
.add("alias", StringType)
.add("segmentoCliente", StringType)
.add("timestampCliente", StringType)
.add("dateCliente", StringType)
.add("timeCliente", StringType)
.add("tokenCliente", StringType)
.add("telefonoCliente", StringType)
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )
val fs = {
val conf = new Configuration()
FileSystem.get(conf)
}
val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")
if( fs.exists(temp_path)){
fs.delete(temp_path, true)
}
while(true)
{
val records=consumer.poll(100)
for (record<-records.asScala){
val data = record.value.toString
//println(data)
val dataos: FSDataOutputStream = fs.create(temp_path)
val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
bw.append(data)
bw.close
val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")
val fechaCliente = data_schema.select("dateCliente").first.getString(0)
if( fechaCliente < upperDate && fechaCliente >= lowerDate){
data_schema.select("longitudCliente", "latitudCliente","dni", "alias",
"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",
"tokenCliente", "telefonoCliente")
.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)
}
else if( fechaCliente < lowerDate){
//
}
else if( fechaCliente >= upperDate){
break;
}
}
}
consumer.close()
}