Kafka JSON Данные со схемой равны нулю в структурированной потоковой передаче PySpark. Несоответствующий ввод на новой схеме - PullRequest
0 голосов
/ 04 мая 2020

Я пытаюсь прочитать сообщения Кафки в JSON в Spark Structured Streaming. Пример сообщений в Kafka выглядит следующим образом:

{
  "_id": {
    "$oid": "5eb292531c7d910b8c98dbce"
  },
  "Id": 37,
  "Timestamp": {
    "$date": 1582889068616
  },
  "TTNR": "R902170286",
  "SNR": 91177446,
  "State": 0,
  "I_A1": "FALSE",
  "I_B1": "FALSE",
  "I1": 0.0037385,
  "Mabs": -20.9814753,
  "p_HD1": 31.0069236,
  "pG": 27.640614,
  "pT": 1.7169713,
  "pXZ": 3.4712914,
  "T3": 25.2174444,
  "nan": 179.3099976,
  "Q1": 0,
  "a_01X": [
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925,
    62.7839925
  ]
}

После чтения потока в Kafka поле значения в виде строки выглядит следующим образом:

|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
{"_id":{"$oid":"5eb292531c7d910b8c98dbce"},"Id":37,"Timestamp":{"$date":1582889068616},"TTNR":"R902170286","SNR":91177446,"State":0,"I_A1":"FALSE","I_B1":"FALSE","I1":0.0037385,"Mabs":-20.9814753,"p_HD1":31.0069236,"pG":27.640614,"pT":1.7169713,"pXZ":3.4712914,"T3":25.2174444,"nan":179.3099976,"Q1":0,"a_01X":[62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925]}

|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Схема была определена для выберите некоторые поля следующим образом:

json_schema=StructType([ \
    StructField("_id",StructField("$oid",StringType())), \
    StructField("Id", DoubleType()), \
    StructField('Timestamp', StructField("$date", LongType())), \
    StructField("TTNR", StringType()), \
    StructField("SNR", DoubleType()), \
    StructField("State", LongType()), \
    StructField("I_A1", StringType()), \
    StructField("I_B1", StringType()), \
    StructField("I1", DoubleType()), \
    StructField("Mabs", DoubleType()), \
    StructField("p_HD1", DoubleType()), \
    StructField("pG", DoubleType()), \
    StructField("pT", DoubleType()), \
    StructField("pXZ", DoubleType()), \
    StructField("T3", DoubleType()), \
    StructField("nan", DoubleType()), \
    StructField("Q1", LongType()), \
    StructField("a_01X", ArrayType(DoubleType()))
    ])

(решено с ошибкой синтаксического анализа ) Но после попытки печати на консоль я получаю null значения:

data_stream_json = data_stream_value.select(from_json(col("value"), json_schema).alias("json_detail"))
data_stream_output = data_stream_json \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

+----+----+----+----+
|  Id|TTNR| SNR|  Q1|
+----+----+----+----+
|null|null|null|null|
+----+----+----+----+

(новая ошибка) После изменения схемы возникла новая проблема при разборе строки:

pyspark.sql.utils.ParseException: u'\nmismatched input \'{\' expecting {\'SELECT\', \'FROM\', \'ADD\', \'AS\', \'ALL\', \'ANY\', \'DISTINCT\', \'WHERE\', \'GROUP\', \'BY\', \'GROUPING\', \'SETS\', \'CUBE\', \'ROLLUP\', \'ORDER\', \'HAVING\', \'LIMIT\', \'AT\', \'OR\', \'AND\', \'IN\', NOT, \'NO\', \'EXISTS\', \'BETWEEN\', \'LIKE\', RLIKE, \'IS\', \'NULL\', \'TRUE\', \'FALSE\', \'NULLS\', \'ASC\', \'DESC\', \'FOR\', \'INTERVAL\', \'CASE\', \'WHEN\', \'THEN\', \'ELSE\', \'END\', \'JOIN\', \'CROSS\', \'OUTER\', \'INNER\', \'LEFT\', \'SEMI\', \'RIGHT\', \'FULL\', \'NATURAL\', \'ON\', \'PIVOT\', \'LATERAL\', \'WINDOW\', \'OVER\', \'PARTITION\', \'RANGE\', \'ROWS\', \'UNBOUNDED\', \'PRECEDING\', \'FOLLOWING\', \'CURRENT\', \'FIRST\', \'AFTER\', \'LAST\', \'ROW\', \'WITH\', \'VALUES\', \'CREATE\', \'TABLE\', \'DIRECTORY\', \'VIEW\', \'REPLACE\', \'INSERT\', \'DELETE\', \'INTO\', \'DESCRIBE\', \'EXPLAIN\', \'FORMAT\', \'LOGICAL\', \'CODEGEN\', \'COST\', \'CAST\', \'SHOW\', \'TABLES\', \'COLUMNS\', \'COLUMN\', \'USE\', \'PARTITIONS\', \'FUNCTIONS\', \'DROP\', \'UNION\', \'EXCEPT\', \'MINUS\', \'INTERSECT\', \'TO\', \'TABLESAMPLE\', \'STRATIFY\', \'ALTER\', \'RENAME\', \'ARRAY\', \'MAP\', \'STRUCT\', \'COMMENT\', \'SET\', \'RESET\', \'DATA\', \'START\', \'TRANSACTION\', \'COMMIT\', \'ROLLBACK\', \'MACRO\', \'IGNORE\', \'BOTH\', \'LEADING\', \'TRAILING\', \'IF\', \'POSITION\', \'EXTRACT\', \'DIV\', \'PERCENT\', \'BUCKET\', \'OUT\', \'OF\', \'SORT\', \'CLUSTER\', \'DISTRIBUTE\', \'OVERWRITE\', \'TRANSFORM\', \'REDUCE\', \'SERDE\', \'SERDEPROPERTIES\', \'RECORDREADER\', \'RECORDWRITER\', \'DELIMITED\', \'FIELDS\', \'TERMINATED\', \'COLLECTION\', \'ITEMS\', \'KEYS\', \'ESCAPED\', \'LINES\', \'SEPARATED\', \'FUNCTION\', \'EXTENDED\', \'REFRESH\', \'CLEAR\', \'CACHE\', \'UNCACHE\', \'LAZY\', \'FORMATTED\', \'GLOBAL\', TEMPORARY, \'OPTIONS\', \'UNSET\', \'TBLPROPERTIES\', \'DBPROPERTIES\', \'BUCKETS\', \'SKEWED\', \'STORED\', \'DIRECTORIES\', \'LOCATION\', \'EXCHANGE\', \'ARCHIVE\', \'UNARCHIVE\', \'FILEFORMAT\', \'TOUCH\', \'COMPACT\', \'CONCATENATE\', \'CHANGE\', \'CASCADE\', \'RESTRICT\', \'CLUSTERED\', \'SORTED\', \'PURGE\', \'INPUTFORMAT\', \'OUTPUTFORMAT\', DATABASE, DATABASES, \'DFS\', \'TRUNCATE\', \'ANALYZE\', \'COMPUTE\', \'LIST\', \'STATISTICS\', \'PARTITIONED\', \'EXTERNAL\', \'DEFINED\', \'REVOKE\', \'GRANT\', \'LOCK\', \'UNLOCK\', \'MSCK\', \'REPAIR\', \'RECOVER\', \'EXPORT\', \'IMPORT\', \'LOAD\', \'ROLE\', \'ROLES\', \'COMPACTIONS\', \'PRINCIPALS\', \'TRANSACTIONS\', \'INDEX\', \'INDEXES\', \'LOCKS\', \'OPTION\', \'ANTI\', \'LOCAL\', \'INPATH\', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 0)\n\n== SQL ==\n{"fields":[{"metadata":{},"name":"_id","nullable":true,"type":{"metadata":{},"name":"$oid","nullable":true,"type":"string"}},{"metadata":{},"name":"Id","nullable":true,"type":"double"},{"metadata":{},"name":"Timestamp","nullable":true,"type":{"metadata":{},"name":"$date","nullable":true,"type":"long"}},{"metadata":{},"name":"TTNR","nullable":true,"type":"string"},{"metadata":{},"name":"SNR","nullable":true,"type":"double"},{"metadata":{},"name":"State","nullable":true,"type":"long"},{"metadata":{},"name":"I_A1","nullable":true,"type":"string"},{"metadata":{},"name":"I_B1","nullable":true,"type":"string"},{"metadata":{},"name":"I1","nullable":true,"type":"double"},{"metadata":{},"name":"Mabs","nullable":true,"type":"double"},{"metadata":{},"name":"p_HD1","nullable":true,"type":"double"},{"metadata":{},"name":"pG","nullable":true,"type":"double"},{"metadata":{},"name":"pT","nullable":true,"type":"double"},{"metadata":{},"name":"pXZ","nullable":true,"type":"double"},{"metadata":{},"name":"T3","nullable":true,"type":"double"},{"metadata":{},"name":"nan","nullable":true,"type":"double"},{"metadata":{},"name":"Q1","nullable":true,"type":"long"},{"metadata":{},"name":"a_01X","nullable":true,"type":{"containsNull":true,"elementType":"double","type":"array"}}],"type":"struct"}\n^^^\n'

Мне нужна помощь с этим.

Ответы [ 3 ]

1 голос
/ 09 мая 2020

Я не знаю весь ваш код, но, увидев тот, который вы поместили здесь, мне кажется, что вам сначала нужно сначала преобразовать входные данные kafka в String, так как они изначально идут в HexaDecimal, а затем вы используете свою схему для этой строки .

1 голос
/ 04 мая 2020

Note Если у вас есть комплексное вложенное json, попробуйте использовать этот метод DataType.fromJson для преобразования схемы json в схему StructType и сохраните схему json вне кода. Любое изменение в схеме просто обновляет схему json и перезапускает ваше приложение, оно автоматически примет новую схему.

Я преобразовал json данные в строку схемы. Пожалуйста, проверьте код ниже.

scala> val jsonSchema = """{"type":"struct","fields":[{"name":"I1","type":"double","nullable":true,"metadata":{}},{"name":"I_A1","type":"string","nullable":true,"metadata":{}},{"name":"I_B1","type":"string","nullable":true,"metadata":{}},{"name":"Id","type":"long","nullable":true,"metadata":{}},{"name":"Mabs","type":"double","nullable":true,"metadata":{}},{"name":"Q1","type":"long","nullable":true,"metadata":{}},{"name":"SNR","type":"long","nullable":true,"metadata":{}},{"name":"State","type":"long","nullable":true,"metadata":{}},{"name":"T3","type":"double","nullable":true,"metadata":{}},{"name":"TTNR","type":"string","nullable":true,"metadata":{}},{"name":"Timestamp","type":{"type":"struct","fields":[{"name":"$date","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"_id","type":{"type":"struct","fields":[{"name":"$oid","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"a_01X","type":{"type":"array","elementType":"double","containsNull":true},"nullable":true,"metadata":{}},{"name":"nan","type":"double","nullable":true,"metadata":{}},{"name":"pG","type":"double","nullable":true,"metadata":{}},{"name":"pT","type":"double","nullable":true,"metadata":{}},{"name":"pXZ","type":"double","nullable":true,"metadata":{}},{"name":"p_HD1","type":"double","nullable":true,"metadata":{}}]}"""
jsonSchema: String = {"type":"struct","fields":[{"name":"I1","type":"double","nullable":true,"metadata":{}},{"name":"I_A1","type":"string","nullable":true,"metadata":{}},{"name":"I_B1","type":"string","nullable":true,"metadata":{}},{"name":"Id","type":"long","nullable":true,"metadata":{}},{"name":"Mabs","type":"double","nullable":true,"metadata":{}},{"name":"Q1","type":"long","nullable":true,"metadata":{}},{"name":"SNR","type":"long","nullable":true,"metadata":{}},{"name":"State","type":"long","nullable":true,"metadata":{}},{"name":"T3","type":"double","nullable":true,"metadata":{}},{"name":"TTNR","type":"string","nullable":true,"metadata":{}},{"name":"Timestamp","type":{"type":"struct","fields":[{"name":"$date","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"_id","type":{"type":"struct","fields":[{"name":"$oid","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"a_01X","type":{"type":"array","elementType":"double","containsNull":true},"nullable":true,"metadata":{}},{"name":"nan","type":"double","nullable":true,"metadata":{}},{"name":"pG","type":"double","nullable":true,"metadata":{}},{"name":"pT","type":"double","nullable":true,"metadata":{}},{"name":"pXZ","type":"double","nullable":true,"metadata":{}},{"name":"p_HD1","type":"double","nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
schema: org.apache.spark.sql.types.StructType = StructType(StructField(I1,DoubleType,true), StructField(I_A1,StringType,true), StructField(I_B1,StringType,true), StructField(Id,LongType,true), StructField(Mabs,DoubleType,true), StructField(Q1,LongType,true), StructField(SNR,LongType,true), StructField(State,LongType,true), StructField(T3,DoubleType,true), StructField(TTNR,StringType,true), StructField(Timestamp,StructType(StructField($date,LongType,true)),true), StructField(_id,StructType(StructField($oid,StringType,true)),true), StructField(a_01X,ArrayType(DoubleType,true),true), StructField(nan,DoubleType,true), StructField(pG,DoubleType,true), StructField(pT,DoubleType,true), StructField(pXZ,DoubleType,true), StructField(p_HD1,DoubleType,true))



0 голосов
/ 10 мая 2020

Я понял это.

Хитрость заключалась в том, чтобы изменить мой сериализатор Kafka с AVRO на строковый формат. Хотя AVRO сохраняет схему, она также ввела некоторые предшествующие символы, такие как символ новой строки (см. Ниже), который было трудно удалить и проанализировать как json в моем случае.

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
{"_id":{"$oid":"5e58f86d5afd84039c135405"},"Id":1,"Timestamp":{"$date":1582889068580},"TTNR":"R902170286","SNR":92177446,"State":0,"I_A1":"FALSE","I_B1":"FALSE","I1":0.0036622,"Mabs":-20.5236976,"p_HD1":30.985062,"pG":27.7779473,"pT":1.727958,"pXZ":3.4487671,"T3":25.2296518,"nan":215.3000031,"Q1":0,"a_01X":[62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925]}
|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Получение моего ввода в виде строки ввело больше полей, которые было легче удалить. Мне пришлось определить схему большего размера, но разбор прошел успешно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...