Преобразование потокового JSON в DataFrame - PullRequest
0 голосов
/ 19 февраля 2019

Вопрос: Как я могу преобразовать строку JSON в DataFrame, а также выбрать только те ключи, которые мне нужны?

Я только начал использовать Spark на прошлой неделе и все еще учусь, поэтому, пожалуйста, потерпите меня.

Я использую Spark (2.4) Структурированная потоковая передача.Приложение spark получает данные (через сокет) из потокового твиттера и отправляет данные в виде полной строки JSON.Ниже приведен один из фреймов данных.Каждая строка представляет собой полный твит в формате JSON.

+--------------------+
|               value|
+--------------------+
|{"created_at":"Tu...|
|{"created_at":"Tu...|
|{"created_at":"Tu...|
+--------------------+

Как предложил Venkata, я сделал это, переведя на python (полные коды приведены ниже)

schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')

Это возвращаемое значение

+------------------------------+-------------------+
|created_at                    |id_str             |
+------------------------------+-------------------+
|Wed Feb 20 04:51:18 +0000 2019|1098082646511443968|
|Wed Feb 20 04:51:18 +0000 2019|1098082646285082630|
|Wed Feb 20 04:51:18 +0000 2019|1098082646444441600|
|Wed Feb 20 04:51:18 +0000 2019|1098082646557642752|
|Wed Feb 20 04:51:18 +0000 2019|1098082646494797824|
|Wed Feb 20 04:51:19 +0000 2019|1098082646817681408|
+------------------------------+-------------------+

Как видно, в DataFrame были включены только те 2 ключа, которые мне нужны.

Надеюсь, это поможет любому новичку.

Полные коды

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType


spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
sc = spark.sparkContext

lines = spark.readStream.format('socket').option('host', '127.0.0.1').option('port', 9999).load()

schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')

query = df.writeStream.format('console').option('truncate', 'False').start()

# this part is only used to print out the query when running as an app. Not needed if using jupyter
import time
time.sleep(10)
lines.stop()

1 Ответ

0 голосов
/ 19 февраля 2019

Вот пример кода, который вы можете использовать для преобразования из json в DataFrame.

val schema = new StructType().add("id", StringType).add("pin",StringType)

val dataFrame= data.
selectExpr("CAST(value AS STRING)").as[String].
select(from_json($"value",schema).
alias("tmp")).
select("tmp.*")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...