Вставить RDD с объектом JSON в таблицу HBASE с использованием Phoenix - PullRequest
0 голосов
/ 26 декабря 2018

Я создаю 'RDD' newData, получая данные из REST API, используя 'Scala'.Содержимое RDD является объектом 'JSON', как показано ниже:

JObject (List ((activity-hrt, JArray (List (JObject (List ((date, JString (2018-12-21))))), (значение, JObject (List ((customHrtRtZns, JArray (List ())), (hrtRtZns, JArray (List (JObject (List ((max, JInt (88))), (min, JInt (30)), (имя, JString (Вне диапазона)))), JObject (List ((max, JInt (123)), (min, JInt (88)), (name, JString (Fat Burn)))), JObject (List ((max, JInt(150)), (min, JInt (123)), (name, JString (Cardio)))), JObject (List ((max, JInt (220)), (min, JInt (150)), (name,JString (Peak)))))))))))))))))

Я хочу поместить эти данные RDD в таблицу 'HBASE', используя 'Phoenix'.Мне нужно также определить соответствующую структуру таблицы «Феникс».Ниже приведена структура таблицы 'Phoenix', которую я создал:

CREATE TABLE IF NOT EXISTS "HRT"(
    "id" INTEGER,
    "activitiesHrt"."dateTime" VARCHAR(32),
    "value"."customHrtRtZns" VARCHAR(32),
    "hrtRtZns"."max" INTEGER,
    "hrtRtZns"."min" INTEGER,
    "hrtRtZns"."name" VARCHAR(32),
    CONSTRAINT pk PRIMARY KEY ("userid")
);

Наряду со вставкой данных из RDD в вышеприведенную таблицу, мне также необходимо вставить порядковый порядковый номер в столбец ID для первичного ключа.Я попробовал следующий код:

import org.apache.phoenix.spark._

sc.parallelize(newData)
.saveToPhoenix(
"HEART",
Seq("id","date","customHrtRtZns","max","min",
"name"),

zkUrl = Some(phoenixServer)

Но получаю ошибку, приведенную ниже:

import spark.implicits._
<console>:116: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[String]
 required: Seq[?]
               sc.parallelize(newData).saveToPhoenix(
                              ^
<console>:124: error: not found: value zkUrl
               zkUrl = Some(phoenixServer)
               ^
import spark.implicits._
<console>:116: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[String]
 required: Seq[?]
               sc.parallelize(newData).saveToPhoenix(
                              ^
<console>:124: error: not found: value zkUrl
               zkUrl = Some(phoenixServer)
               ^

Я проверил вставку в таблицу выше, используя жестко закодированные значения, которые работают нормально.Любые предложения по вставке с использованием RDD вместе с порядковым номером в столбце PK.

...