кодировка юникода в функции python - PullRequest
0 голосов
/ 06 мая 2020

Я действительно борюсь с этой проблемой.

Ниже я беру домен, например something.facebook.com, и превращаю его в facebook.com, используя свой UDF.

Я получаю эта ошибка:

UnicodeEncodeError: 'ascii' codec can't encode characters in position 64-65: ordinal not in range(128)

Я пробовал кое-что обойти, но я действительно не понимаю, почему это вызывает проблему.

Я был бы очень признателен за любые указатели: )

toplevel = ['.co.uk', '.co.nz', '.com', '.net', '.uk', '.org', '.ie', '.it', '.gov.uk', '.news', '.co.in',
  '.io', '.tw', '.es', '.pe', '.ca', '.de', '.to', '.us', '.br', '.im', '.ws', '.gr', '.cc', '.cn', '.me', '.be',
  '.tv', '.ru', '.cz', '.st', '.eu', '.fi', '.jp', '.ai', '.at', '.ch', '.ly', '.fr', '.nl', '.se', '.cat', '.com.au',
  '.com.ar', '.com.mt', '.com.co', '.org.uk', '.com.mx', '.tech', '.life', '.mobi', '.info', '.ninja', '.today', '.earth', '.click']

def cleanup(domain):
    print(domain)
    if domain is None or domain == '':
        domain = 'empty'
        return domain
    for tld in toplevel:
            if tld in str(domain):
                    splitdomain = domain.split('.')
                    ext = tld.count('.')
                    if ext == 1:
                        cdomain = domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
                        return cdomain
                    elif ext == 2:
                        cdomain = domain.split('.')[-3].encode('utf-8') + '.' + domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
                        return cdomain
                    elif domain == '':
                        cdomain = 'empty'
                        return cdomain
                    else:
                        return domain
'''
#IPFR DOMS
'''
output = ipfr_logs.withColumn('capital',udfdict(ipfr_logs.domain)).createOrReplaceTempView('ipfrdoms')
ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')

Вот полный след

Traceback (most recent call last):
  File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 177, in <module>
    ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/session.py", line 714, in sql
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
        at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:87)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 685 in stage 0.0 failed 4 times, most recent failure: Lost task 685.3 in stage 0.0 (TID 4945, uds-far-dn112.dab.02.net, executor 22): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 229, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 74, in <lambda>
    return lambda *a: f(*a)
  File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 147, in cleanup
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-3: ordinal not in range(128)

Ответы [ 2 ]

2 голосов
/ 07 мая 2020

Кажется, это решает проблему. Интересно, что на выходе никогда не бывает «ошибка юникода»

   def cleanup(domain):
        try:
            if domain is None or domain == '':
                domain = 'empty'
                return str(domain)
            for tld in toplevel:
                    if tld in domain: 
                            splitdomain = domain.split('.')
                            ext = tld.count('.')
                            if ext == 1:
                                cdomain = domain.split('.')[-2] + '.' + domain.split('.')[-1]
                                return str(cdomain)
                            elif ext == 2:
                                cdomain = domain.split('.')[-3] + '.' + domain.split('.')[-2] + '.' + domain.split('.')[-1]
                                return str(cdomain)
                            elif domain == '':
                                cdomain = 'empty'
                                return str(cdomain)
                            else:
                                return str(domain)
        except UnicodeEncodeError:
            domain = 'unicode error'
            return domain
0 голосов
/ 07 мая 2020

Попробуйте вот это.

def cleanup(domain):
   #Test the empt Entry
   if domain is None or domain == '':
     domain = 'empty'
      return domain

   listSub = domain.split('.')
   result = listSub[1]

   #get the part we need www.facebook.com > .facebook.com
   for part in listSub[2:]:
      result = result + '.' + part

    return result.encode('Utf-8')
...