Какой самый рекомендуемый способ скопировать все данные из раздела Kafka в приемник (файл или таблица Hive)? - PullRequest
0 голосов
/ 10 мая 2018

Я использую API-интерфейс Kafka Consumer для копирования всех данных из раздела Kafka в таблицу Hive. Для этого я использую HDFS в качестве промежуточного шага. Я использую уникальный идентификатор группы и сбрасываю смещение на «самое раннее», чтобы получить все данные с начала и игнорировать коммиты после выполнения. Затем я перебираю записи в теме Кафки и сохраняю каждую запись во временный файл в HDFS. Затем я использую Spark для чтения данных из HDFS и сохраняю их в файл Parquet, используя дату в качестве имени файла. Затем я создаю раздел с датой в таблице Hive и, наконец, загружаю файл в Parquet как раздел в Hive.

Как вы можете видеть из кода ниже, я использую несколько промежуточных шагов, что делает мой код далеко не оптимальным. Это самый рекомендуемый способ скопировать все данные из темы Кафки? Я провел некоторое исследование, и до сих пор это был обходной путь, который мне удалось получить к работе, однако, поскольку количество записей увеличивается с каждым днем, мое время выполнения достигает предела допустимого (с 20 минут до 6 часов за 2 недель).

Код здесь:

def start( lowerDate: String, upperDate: String )={

    // Configurations for kafka consumer
    val conf = ConfigFactory.parseResources("properties.conf")
    val brokersip = conf.getString("enrichment.brokers.value")
    val topics_in = conf.getString("enrichment.topics_in.value")

    // Crea la sesion de Spark
    val spark = SparkSession
      .builder()
      .master("yarn")
      .appName("ParaTiUserXY")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._

    val properties = new Properties
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])
    properties.put("bootstrap.servers", brokersip)
    properties.put("auto.offset.reset", "earliest")
    properties.put("group.id", "ParaTiUserXYZZ12345")


    //Schema para transformar los valores del topico de Kafka a JSON
    val my_schema = new StructType()
        .add("longitudCliente", StringType)
        .add("latitudCliente", StringType)
        .add("dni", StringType)
        .add("alias", StringType)
        .add("segmentoCliente", StringType)
        .add("timestampCliente", StringType)
        .add("dateCliente", StringType)
        .add("timeCliente", StringType)
        .add("tokenCliente", StringType)
        .add("telefonoCliente", StringType)

    val consumer = new KafkaConsumer[String, String](properties)
    consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents")   )

    val fs = {
      val conf = new Configuration()
      FileSystem.get(conf)
    }


    val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")
    if( fs.exists(temp_path)){
      fs.delete(temp_path, true)
    }

    while(true)
    {
        val records=consumer.poll(100)
        for (record<-records.asScala){
            val data = record.value.toString
            //println(data)
            val dataos: FSDataOutputStream = fs.create(temp_path)
            val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
            bw.append(data)
            bw.close
            val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")
            val fechaCliente = data_schema.select("dateCliente").first.getString(0)

            if( fechaCliente < upperDate && fechaCliente >= lowerDate){
                data_schema.select("longitudCliente", "latitudCliente","dni", "alias", 
                "segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",
                "tokenCliente", "telefonoCliente")
                   .coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

            }
            else if( fechaCliente < lowerDate){
                //
            }
            else if( fechaCliente >= upperDate){
              break;
            }

        }
   }

      consumer.close()

}
...