Я пытаюсь перенести данные кафки в HDFS.Я вижу данные темы kafka в окне kafka-consumer-console.
Вот мой код.Не вызывать writeToWebHDFS(record)
сам метод.Пока Before calling HDFS
печатает.writeToWebHDFS
метод содержит новый URL-адрес целевой зоны и код написания.
val stream = KafkaUtils.createDirectStream [String, String] (ssc, PreferConsistent, Subscribe [String, String] (themes, kafkaParams))stream.map (запись => (record.value (). ToString)). Печатьprint ("+++++++++++++ Перед вызовом HDFS +++++++++++++++++++++++") val uploadFile = stream.map(запись => writeToWebHDFS (запись))
writeToWebHDFS
фрагмент кода
def writeToWebHDFS (запись:> org.apache.kafka.clients.consumer.ConsumerRecord [String, String]) = {
val res = Http ("https://hdfsurl:port/gateway/webhdfs/webhdfs/v1/opt/sandboxes/user/test/" + record.key (). toString (). toLowerCase (). replaceAll (" ", "") + ".txt? op = CREATE & overwrite = true"). put ("") .option (HttpOptions.allowUnsafeSSL) .auth ("user_mail_id", "* pwd "). asString ()
val location = res.headers.get ("Location"). Get (0) val upload = Http (location.toString ()). Put (record.value ()) .timeout (30000,30000) .option (HttpOptions.allowUnsafeSSL) .auth ("user_mail_id", "* pwd "). AsString
print ("Закончена загрузка в HDFS")}
Подскажите пожалуйста, как вызвать функцию writeToWebHDFS