Давайте сначала создадим фрейм данных с данными вашего примера:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('explode_example').getOrCreate()
data = [
("harry", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "happy"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "sad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "mad"}
]),
("sally", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "mad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("gary", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "excited"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "down"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("mary", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "low"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "content"}
]),
("gerry", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "ecstatic"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "good"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "bad"}
])
]
df = spark.createDataFrame(data=data, schema = ["name", "city", "sentiment"])
У вас есть следующий фрейм данных:
df.show(truncate=False)
+-----+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|name |city |sentiment |
+-----+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|harry|london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> happy], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> sad], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> mad]] |
|sally|london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> mad], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated]] |
|gary |london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> excited], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> down], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated]]|
|mary |manchester|[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> low], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> content]] |
|gerry|manchester|[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> ecstatic], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> good], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> bad]] |
+-----+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Как только у нас будет фрейм данных, вам нужно взорвать столбец sentiment
:
from pyspark.sql.functions import explode
df_exp = df.select(df["name"], df["city"], explode(df["sentiment"]))
И результат:
df_exp.show(truncate=False)
+-----+----------+---------------------------------------------------------------------+
|name |city |col |
+-----+----------+---------------------------------------------------------------------+
|harry|london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> happy] |
|harry|london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> sad] |
|harry|london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> mad] |
|sally|london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad] |
|sally|london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> mad] |
|sally|london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated] |
|gary |london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> excited] |
|gary |london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> down] |
|gary |london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated] |
|mary |manchester|[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad] |
|mary |manchester|[sentiment -> {score=-0.607594}, score -> 0.609836, text -> low] |
|mary |manchester|[sentiment -> {score=-0.6833}, score -> 0.58564, text -> content] |
|gerry|manchester|[sentiment -> {score=-0.640237}, score -> 0.999926, text -> ecstatic]|
|gerry|manchester|[sentiment -> {score=-0.607594}, score -> 0.609836, text -> good] |
|gerry|manchester|[sentiment -> {score=-0.6833}, score -> 0.58564, text -> bad] |
+-----+----------+---------------------------------------------------------------------+
Наконец, давайте создадим столбец только с текстом, отфильтруем по городу и получим 3 требуемых столбца:
# Extract text
df_exp = df_exp.withColumn("text", df_exp["col"].text)
# Select result columns and filter city
result = df_exp.select("name", "city", "text").where("city = 'london'")
И результат будет:
result.show(truncate=False)
+-----+------+--------+
|name |city |text |
+-----+------+--------+
|harry|london|happy |
|harry|london|sad |
|harry|london|mad |
|sally|london|sad |
|sally|london|mad |
|sally|london|agitated|
|gary |london|excited |
|gary |london|down |
|gary |london|agitated|
+-----+------+--------+