Запись данных темы Кафки в HDFS - PullRequest
0 голосов
/ 12 марта 2019

Я пытаюсь перенести данные кафки в HDFS.Я вижу данные темы kafka в окне kafka-consumer-console.

Вот мой код.Не вызывать writeToWebHDFS(record) сам метод.Пока Before calling HDFS печатает.writeToWebHDFS метод содержит новый URL-адрес целевой зоны и код написания.

val stream = KafkaUtils.createDirectStream [String, String] (ssc, PreferConsistent, Subscribe [String, String] (themes, kafkaParams))stream.map (запись => (record.value (). ToString)). Печатьprint ("+++++++++++++ Перед вызовом HDFS +++++++++++++++++++++++") val uploadFile = stream.map(запись => writeToWebHDFS (запись))

writeToWebHDFS фрагмент кода

def writeToWebHDFS (запись:> org.apache.kafka.clients.consumer.ConsumerRecord [String, String]) = {

val res = Http ("https://hdfsurl:port/gateway/webhdfs/webhdfs/v1/opt/sandboxes/user/test/" + record.key (). toString (). toLowerCase (). replaceAll (" ", "") + ".txt? op = CREATE & overwrite = true"). put ("") .option (HttpOptions.allowUnsafeSSL) .auth ("user_mail_id", "* pwd "). asString ()

val location = res.headers.get ("Location"). Get (0) val upload = Http (location.toString ()). Put (record.value ()) .timeout (30000,30000) .option (HttpOptions.allowUnsafeSSL) .auth ("user_mail_id", "* pwd "). AsString

print ("Закончена загрузка в HDFS")}

Подскажите пожалуйста, как вызвать функцию writeToWebHDFS

1 Ответ

3 голосов
/ 12 марта 2019

Я бы посоветовал вместо того, чтобы заново изобретать Колесо, использовать разъем HDFS. Вы получите более подробную информацию здесь https://github.com/confluentinc/kafka-connect-hdfs

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...