Префикс имени поля в Python - PullRequest
0 голосов
/ 22 декабря 2018

Я надеюсь, что вы можете помочь!

Я сделал следующее для преобразования SQL в код PySpark.Все это прекрасно работает и выдает результат, который я вставил ниже кода

Единственный проблемный бит:

.filter(((( dt == 20181211))&(hour == 13 )|( hour == 14)))

Здесь перед каждым полем должен стоять префиксdf1.- например, df1.dt.

Я не могу придумать разумного способа сделать это - я думал о наборе df1.после каждой скобки, но где у нас (((это не сработало бы.

Любые идеи ???

#
#
# INPUTS
#
#
query = 'select dt, hour, count(imsi), sum(service_dl_bytes), sum(service_ul_bytes), count(msisdn)'
from_statement = 'from udsapp.dpi_datasum'
where = '((where dt = 20181211)) and (hour = 13 or hour = 14)'
#
#
#OUTPUTS
#
#
#split items in the query at the comma
result = [x.strip() for x in query.split(',')]

all_and = []
all_or = []
features = []
sums = []
sum_fields = []
count_fields = []
counts = []

#extract features - if sum / count, then add to their own lists
#if not sum or count, then add to features list
for x in result:
  if 'sum' in x:
    sums.append(x)
  elif 'count' in x:
    counts.append(x)
  else:
    if 'select' in x:
      features.append(x.replace('select', '').strip())
    else:
      features.append(x.replace('select', '').strip())
#add quotes around each of the items in features list & make single string
extracted_features = ', '.join('\'{0}\''.format(x) for x in features)

#add '' around from db
froms = "'" + from_statement + "'"
#remove from & trim, to leave only db & table names
if 'from' in froms:
  new_from = froms.replace('from' , '').strip()

#extract sum fields for agg - split at the brackets to extract only field names
for x in sums:
  if '(' in x:
    z = x.split('(', 1)[-1]
    q = z.split(')', 1)[0]
    sum_fields.append(q)
#Add quotes around each of the sum fields & add to single string
extracted_sums = ', '.join('\'{0}\''.format(x) for x in sum_fields)

#extract count fields for agg - split at the brackets to extract only field names
for x in counts:
  if '(' in x:
    z = x.split('(', 1)[-1]
    q = z.split(')', 1)[0]
    count_fields.append(q)
#Add quotes around each of the count fields & add to single string
extracted_counts = ', '.join('\'{0}\''.format(x) for x in count_fields)

####Extract where attributes
if 'or' in where:
  where = where.replace(') or (', ')|(')
  where = where.replace(')or(', ')|(')
  where = where.replace('or', ')|(')
if 'and' in where:
  where = where.replace(') and (', ')&(')
  where = where.replace(')and(', ')&(')
  where = where.replace('and', ')&(')
if 'where' in where:
  where = where.replace('where', '').strip()
if '=' in where:
  where = where.replace('=', '==')

#####################################################
print("from pyspark.sql import SparkSession")
print("import pyspark.sql.functions as sqlfunc")
print("import argparse, sys")
print("from pyspark.sql import *")
print("from pyspark.sql.functions import *")
print("from datetime import datetime")
#create a context that supports hive
print("def create_session(appname):")
print("    spark_session = SparkSession\\")
print("        .builder\\")
print("        .appName(appname)\\")
print("        .master('yarn')\\")
print("        .config(\"hive.metastore.uris\",thrift://uds-far-mn1.dab.02.net:9083\").enableHiveSupport()\\")
print("        .getOrCreate()")
print("    return spark_session")

### START MAIN ###
print("if __name__ == '__main__':")
print("    spark_session = create_session('myspark)")
print("    df1 = spark_session.table(" + new_from.replace(' ', '').strip() + ")")
print("    df_agg = df1\\")
print("       .coalesce(1000)\\")

# PRINT WHERE STATEMENT
print("       .filter((" + where + "))\\")

# PRINT INITIAL SELECT STATEMENT - Depending on whether there are counts or sums, this will be a different select statement 
if len(sums) and len(counts) == 0:
  print("       .select(" + extracted_features + ")\\")
elif len(sums) ==0 and len(counts) > 0:
  print("       .select(" + extracted_features + "," + extracted_counts + ")\\")
elif len(sums) > 0 and len(counts) == 0:
  print("       .select(" + extracted_features + "," + extracted_sums + ")\\")
elif len(sums) > 0 and len(counts) > 0:
  print("       .select(" + extracted_features + "," + extracted_sums + "," + extracted_counts + ")\\")

##### PRINT AGGREGATIONS WITH CORRECT FORMATTING
sum_list = extracted_sums.split(",")
count_list = extracted_counts.split(",")

#If we have sums and counts, then print the agg statements for all. When we reach the final value in the list, don't add the training comma and slash
if len(sums) and len(counts) > 0:
    print("       .groupBy(" + extracted_features + ")\\")
    print("       .agg(")

    for x in sum_list:
      print("            sqlfunc.sum(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x + "),\\")

    for x in count_list:
      if x != count_list[len(count_list)-1]:
        print("            sqlfunc.count(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x + "),\\")
      else:
        print("            sqlfunc.count(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x + ")")

#If we have sums & no counts, then print the agg statements for all sums. When we reach the final value in the list, don't add the training comma and slash
if len(sums) > 0 and len(counts) == 0:
    print("       .groupBy(" + extracted_features + ")\\")
    print("       .agg(")

    for x in sum_list:
      if x != sum_list[len(sum_list)-1]:
        print("            sqlfunc.sum(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x+ "),\\")
      else:
        print("            sqlfunc.sum(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x+ ")")      

#If we have counts and no sums, then print the agg statements for all counts. When we reach the final value in the list, don't add the training comma and slash
if len(counts) > 0 and len(sums) == 0:
    print("       .groupBy(" + extracted_features + ")\\")
    print("       .agg(")

    for x in count_list:
      if x != count_list[len(count_list)-1]:
        print("            sqlfunc.count(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x + "),\\")
      else:
        print("            sqlfunc.count(df1." + x.replace('\'', '').replace(' ', '').strip() + ").alias(" + x + ")")  

#only print the closing brackets if we have aggregations
if len(counts) >0 or len(sums) >0:
  print("            )\\")

# PRINT THE FINAL SELECT
if len(sums) and len(counts) == 0:
  print("       .select(" + extracted_features + ")")
elif len(sums) ==0 and len(counts) > 0:
  print("       .select(" + extracted_features + "," + extracted_counts + ")")
elif len(sums) > 0 and len(counts) == 0:
  print("       .select(" + extracted_features + "," + extracted_sums + ")")
elif len(sums) > 0 and len(counts) > 0:
  print("       .select(" + extracted_features + "," + extracted_sums + "," + extracted_counts + ")")
print("df_agg.createOrReplaceTempView(\"temporarytable\")")
print("finaldf = spark_session.sql(\"INSERT INTO table . select * from temporarytable\")")

КОД ВЫХОДА

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris","thrift://uds-far-mn1.dab.02.net:9083").enableHiveSupport()\
        .getOrCreate()
    return spark_session
if __name__ == '__main__':
    spark_session = create_session('myspark')
    df1 = spark_session.table('udsapp.dpi_datasum')
    df_agg = df1\
       .coalesce(1000)\
       .filter(((( dt == 20181211))&(hour == 13 )|( hour == 14)))\
       .select('dt', 'hour','service_dl_bytes', 'service_ul_bytes','imsi', 'msisdn')\
       .groupBy('dt', 'hour')\
       .agg(
            sqlfunc.sum(df1.service_dl_bytes).alias('service_dl_bytes'),\
            sqlfunc.sum(df1.service_ul_bytes).alias( 'service_ul_bytes'),\
            sqlfunc.count(df1.imsi).alias('imsi'),\
            sqlfunc.count(df1.msisdn).alias( 'msisdn')
            )\
       .select('dt', 'hour','service_dl_bytes', 'service_ul_bytes','imsi', 'msisdn')
df_agg.createOrReplaceTempView("temporarytable")
finaldf = spark_session.sql("INSERT INTO table . select * from temporarytable")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...