Я немного запутался:
У меня есть скрипт ниже. Я преобразовал искровой DF в Pandas DF для выполнения своих функций.
Теперь у меня есть выходной фрейм данных DF6, и это именно то, что мне нужно.
Теперь мне нужно записать данные обратно в HDFS (что Пандас не может сделать), поэтому мне нужно преобразовать кадр данных Pandas обратно в Spark и записать его в каталог.
Для этого я использовал приведенную ниже функцию, но, к сожалению, она не работает
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
Я получаю ошибку:
Traceback (most recent call last):
File "/home/keenek1/domainscript/another1.py", line 338, in <module>
spark_df.show()
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 2080, .net, executor 81): org.apache.spark.SparkException:
Error from python worker:
/usr/bin/python: No module named pyspark
Очевидно, есть модуль под названием PySpark, потому что я получаю правильный вывод df6, напечатанный в stdout. Это означает, что Spark создал фрейм данных, преобразовал его в Pandas и просто борется с последней функцией.
Что я могу с этим поделать?
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
import pandas as pd
import time
from datetime import datetime
import os
import glob
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://domain.net:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
#import file into dataframe
start = time.time()
#--------------------------------------------------------------------------------------------
#-----------------------------CALCUALTE DATES AND TIMES FOR QUERY----------------------------
#--------------------------------------------------------------------------------------------
dt_now = datetime.now()
target_hour = int(dt_now.strftime('%s')) - 60*60*12
today_date = datetime.fromtimestamp(target_hour).strftime('%Y%m%d')
hour = datetime.fromtimestamp(target_hour).strftime('%H')
#--------------------------------------------------------------------------------------------
#-----------------------------------CREATE DF FROM FILES ------------------------------------
#--------------------------------------------------------------------------------------------
schema = [\
StructField('dmy',StringType(), True),\
StructField('hh',StringType(), True),\
very long list of fields....
]
final_structure = StructType(schema)
df = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://nameservice/data/data/dt=20181022/hour=11/*/*', final_structure)\
.select('domain', 'optimisedsize')
df2 = df.filter(df.domain != '----').groupby('domain').agg(sqlfunc.sum(df.optimisedsize).alias('sdsf'))
df2.show()
df3 = df2.toPandas()
#--------------------------------------------------------------------------------------------
#-----------------------------DEFINE REQUIRED LOOKUP LISTS-----------------------------------
#--------------------------------------------------------------------------------------------
tld = ('co.uk', 'com', 'org', 'gov.uk', 'co', 'net', 'news', 'it', 'in' 'es', 'tw', 'pe', 'io', 'ca', 'cat', 'com.au',
'com.ar', 'com.mt', 'com.co', 'ws', 'to', 'es', 'de', 'us', 'br', 'im', 'gr', 'cc', 'cn', 'org.uk', 'me', 'ovh', 'be',
'tv', 'tech', '..', 'life', 'com.mx', 'pl', 'uk', 'ru', 'cz', 'st', 'info', 'mobi', 'today', 'eu', 'fi', 'jp', 'life',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'earth', 'ninja', 'ie', 'im', 'ai', 'at', 'ch', 'ly', 'market', 'click',
'fr', 'nl', 'se')
cdns = ('akamai', 'akamaized', 'maxcdn', 'cloudflare')
cleandomain = []
#--------------------------------------------------------------------------------------------
#-----------------------------SPLIT DOMAIN AT EVERY DOT--------------------------------------
#--------------------------------------------------------------------------------------------
index = df3.domain.str.split('.').tolist()
#--------------------------------------------------------------------------------------------
#------------------DEFINE FUNCTION FOR DOMAIN MANIPULATION-----------------------------------
#--------------------------------------------------------------------------------------------
def domfunction():
#if it isn't a string, then print the value directly in the cleandomain list
try:
if str(x[-1]).isdigit():
try:
cleandomain.append(str(x[0])+'.'+str(x[1])+'.*.*')
except IndexError:
cleandomain.append(str(x))
#if its in the CDN list, take a subdomain as well
elif len(x) > 3 and str(x[len(x)-2]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 3 and str(x[len(x)-3]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-4])+'.'+str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its in the TLD list, do this
elif len(x) > 2 and str(x[len(x)-2]).rstrip()+'.'+ str(x[len(x)-1]).rstrip() in tld:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 2 and str(x[len(x)-1]) in tld:
try:
cleandomain.append(str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its not in the TLD list, do this
else:
cleandomain.append(str(x))
except IndexError:
cleandomain.append(str(x))
except TypeError:
cleandomain.append(str(x))
#--------------------------------------------------------------------------------------------
#-------------LOOP OVER ITEMS WITHIN THE INDEX & CONCAT REQUIRED ELEMENTS--------------------
#--------------------------------------------------------------------------------------------
for x in index:
domfunction()
#--------------------------------------------------------------------------------------------
#-------------------------------CONFIGURE OUTPUTS--------------------------------------------
#--------------------------------------------------------------------------------------------
#add the column to the dataframe
se = pd.Series(cleandomain)
df3['newdomain2'] = se.values
#select only the new domain column & usage & group by
df5 = df3.groupby(['newdomain2'],as_index = False)[['sdsf']].sum()
df6 = df5.sort_values(['sdsf'], ascending=["true"])
print(df6)
spark_df = spark_session.createDataFrame(df6)
spark_df.show()
spark_df.coalesce(100).write.format("com.databricks.spark.csv").option("header", "false").option('sep', '\t').mode('append').save('hdfs://nameservice/user/keenek1/domainlookup')
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
print(df6)
end = time.time()
print("RunTime:")
print(end-start)