извлекать теги из столбца Dataframe - PullRequest
0 голосов
/ 27 октября 2019

У меня есть датафрейм с данными из записной книжки Python для Azure Consumtion Databricks. Я показываю здесь только подмножество столбцов / строк.

[Row(ResourceRate='0.029995920244854', PreTaxCost='0.719902085876484',  
ResourceType='Microsoft.Compute/virtualMachines',  Tags=None, ),
 Row(ResourceRate='1.10999258782982',  PreTaxCost='26.6398221079157',  
ResourceType='Microsoft.Compute/virtualMachines',  
Tags='"{  ""project"": ""70023"",  ""service"": ""10043""}"')
 ]

Мне нужно извлечь теги из столбца Теги и представить их как столбцы (таблицы).
Кстати, я не уверен, где мне взять эти пары двойных кавычек. Вероятно, из исходной таблицы Beeing .csv. Но это, вероятно, легко решить в конце.

Я использую pyspark. Я пытался сделать что-то вроде этого Разделить столбец строки Spark Dataframe на несколько столбцов

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import split, posexplode, concat, expr, lit, col, first
df2 = df.withColumn("num", monotonically_increasing_id())
df3 = df2.select(
        "num",
        split("Tags", ", ").alias("Tags"),
        posexplode(split("Tags", ",")).alias("pos", "val")
    )
#display(df3)
df4 = df3.drop("val")\
    .select(
        "num",
        concat(lit("Tag"),col("pos").cast("string")).alias("name"),
        expr("Tags[pos]").alias("val")
    )
# display(df4)
df5 = df4.groupBy("num").pivot("name").agg(first("val"))
display(df5)

Это не совсем то, что я хочу.

num     Tag0
964     
1677    """project"": ""70023"", """service"": ""10024""
2040    """project"": ""70025"", """service"": ""10034""
2214    
...

Я бы предпочел получать теги в виде столбцов:

num     project        service       ResourceRate       PreTaxCost
964                                  0.029995920244854  0.719902085876484
677     70023          10024         1.10999258782982   26.6398221079157
2040    70025          10034         0.029995920244854  0.719902085876484
2214                                 0.029995920244854  0.719902085876484
...

Ответы [ 2 ]

1 голос
/ 28 октября 2019

IIUC, вы можете преобразовать Tags в столбец строк JSON ( trim ведущий и конечный " и regexp_replace удвоить " до одинокого ") и затем использовать json_tuple () , чтобы получить нужные поля. см. ниже код:

from pyspark.sql.functions import expr, json_tuple

df.withColumn('Tags', expr("""regexp_replace(trim(BOTH '"' FROM Tags), '""', '"')""")) \
  .select('*', json_tuple('Tags', 'project', 'service').alias('project','service'))\
  .show()                                                  
#+-----------------+-----------------+--------------------+--------------------+-------+-------+
#|       PreTaxCost|     ResourceRate|        ResourceType|                Tags|project|service|
#+-----------------+-----------------+--------------------+--------------------+-------+-------+
#|0.719902085876484|0.029995920244854|Microsoft.Compute...|                null|   null|   null|
#| 26.6398221079157| 1.10999258782982|Microsoft.Compute...|{ "project": "700...|  70023|  10043|
#+-----------------+-----------------+--------------------+--------------------+-------+-------+
0 голосов
/ 28 октября 2019

Вот пример кода, который пытается разбить теги на несколько столбцов:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f


def columnList(r):
  val = str(r[0].tags)
  i = int(val.index("{") + 1)
  j = int(val.index("}"))
  val = val[i:j]
  vals = val.split(",")
  collist = []
  collist.append('id')
  for val in vals:
    keyval = val.split(":")
    key = keyval[0]
    collist.append(key.replace('"',""))
  return collist

def valueList(r):
  val = r[1]
  i = int(val.index("{")+1)
  j = int(val.index("}"))
  val = val[i:j]
  vals = val.split(",")
  valList = []
  valList.append(r[0])
  for val in vals:
      keyval = val.split(":")
      value = keyval[1]
      valList.append(value.replace('"',""))
  return valList

sc = SparkSession.builder.appName("example").\
config("spark.driver.memory","1g").\
config("spark.executor.cores",2).\
config("spark.max.cores",4).getOrCreate()

df = 

sc.read.format ("csv"). Option ("header", "true"). Option ("delimiter""," | "). load (" columns.csv ")

tagsdf = df.select("id","tags")


colList = columnList(tagsdf.rdd.take(1))
tagsdfrdd = tagsdf.rdd.map(lambda r : valueList(r))

dfwithnewcolumns = tagsdfrdd.toDF(colList)

newdf = df.drop("tags").join(dfwithnewcolumns,on=["id"])

newdf.show()

Пример идентификатора тестового файла | ResourceRate | PreTaxCost | ResourceType | Теги 1 | '1.10999258782982' | '26 .6398221079157 '|' Microsoft.Compute/ virtualMachines '|' "{" "project" ":" "70023" "," "service" ":" "10043" "}" '

Если у вас нет столбца id, то вы можете захотетьобъединить Rdds

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...