Я действительно борюсь с этой проблемой.
Ниже я беру домен, например something.facebook.com, и превращаю его в facebook.com, используя свой UDF.
Я получаю эта ошибка:
UnicodeEncodeError: 'ascii' codec can't encode characters in position 64-65: ordinal not in range(128)
Я пробовал кое-что обойти, но я действительно не понимаю, почему это вызывает проблему.
Я был бы очень признателен за любые указатели: )
toplevel = ['.co.uk', '.co.nz', '.com', '.net', '.uk', '.org', '.ie', '.it', '.gov.uk', '.news', '.co.in',
'.io', '.tw', '.es', '.pe', '.ca', '.de', '.to', '.us', '.br', '.im', '.ws', '.gr', '.cc', '.cn', '.me', '.be',
'.tv', '.ru', '.cz', '.st', '.eu', '.fi', '.jp', '.ai', '.at', '.ch', '.ly', '.fr', '.nl', '.se', '.cat', '.com.au',
'.com.ar', '.com.mt', '.com.co', '.org.uk', '.com.mx', '.tech', '.life', '.mobi', '.info', '.ninja', '.today', '.earth', '.click']
def cleanup(domain):
print(domain)
if domain is None or domain == '':
domain = 'empty'
return domain
for tld in toplevel:
if tld in str(domain):
splitdomain = domain.split('.')
ext = tld.count('.')
if ext == 1:
cdomain = domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
return cdomain
elif ext == 2:
cdomain = domain.split('.')[-3].encode('utf-8') + '.' + domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
return cdomain
elif domain == '':
cdomain = 'empty'
return cdomain
else:
return domain
'''
#IPFR DOMS
'''
output = ipfr_logs.withColumn('capital',udfdict(ipfr_logs.domain)).createOrReplaceTempView('ipfrdoms')
ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')
Вот полный след
Traceback (most recent call last):
File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 177, in <module>
ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/session.py", line 714, in sql
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:87)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 685 in stage 0.0 failed 4 times, most recent failure: Lost task 685.3 in stage 0.0 (TID 4945, uds-far-dn112.dab.02.net, executor 22): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 229, in main
process()
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 149, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 74, in <lambda>
return lambda *a: f(*a)
File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 147, in cleanup
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-3: ordinal not in range(128)