Я пытаюсь проанализировать данные Stream kafka в формате JSON, чтобы я мог проанализировать входящие транснациональные данные в реальном времени для требуемой логики и в дальнейшем захотеть обновить их в таблице Hbase.
1. Входящий поток данных будет в этом формате.
2.Где мне нужно извлечь card_id
, amount
, postcode
и transaction_dt
{«card_id»: 348702330256514, «member_id»: 000037495066290, «сумма»:
9084849, «pos_id»: 614677375609919, «почтовый индекс»: 33946,
«Action_dt »:« 11-02-2018 00:00:00 »}
Создан Kafka Consumer с использованием кода, как указано ниже, однако не уверен, как я могу обработать его в Jason через RDD.
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkStreamingDemo").setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "100.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupkafkaspark2");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("transactions-topic-verified");
JavaDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
Мне нужно получить вышеуказанные 4 поля, а затем обработать их, просматривая предварительно созданную таблицу hbase с похожими данными.