Как преобразовать поток Kafka в формат Json в Spark для анализа данных на языке Java - PullRequest
0 голосов
/ 18 июня 2019

Я пытаюсь проанализировать данные Stream kafka в формате JSON, чтобы я мог проанализировать входящие транснациональные данные в реальном времени для требуемой логики и в дальнейшем захотеть обновить их в таблице Hbase.

1. Входящий поток данных будет в этом формате.

2.Где мне нужно извлечь card_id, amount, postcode и transaction_dt

{«card_id»: 348702330256514, «member_id»: 000037495066290, «сумма»: 9084849, «pos_id»: 614677375609919, «почтовый индекс»: 33946, «Action_dt »:« 11-02-2018 00:00:00 »}

Создан Kafka Consumer с использованием кода, как указано ниже, однако не уверен, как я могу обработать его в Jason через RDD.

    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);

    SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkStreamingDemo").setMaster("local");

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "100.xx.xxx.xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "groupkafkaspark2");
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);

    Collection<String> topics = Arrays.asList("transactions-topic-verified");

    JavaDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

Мне нужно получить вышеуказанные 4 поля, а затем обработать их, просматривая предварительно созданную таблицу hbase с похожими данными.

1 Ответ

0 голосов
/ 22 июня 2019

Это можно сделать с помощью классов JSONObject и JSONPasrer.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...