Не удается распаковать потоковые данные с помощью UDF в Azure Databricks - Python - PullRequest
0 голосов
/ 06 мая 2019

Я пытаюсь прочитать сжатые сообщения Azure EventHub GZIP, используя Azure DataBricks и python (PySpark), но использование UDF не работает с данными BinaryType.

Ну, вот часть, где я проверяю, что находится в теле

df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)

И это показывает хорошо сжатые данные, как показано ниже: H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...

Однако, когда я пытаюсь отправить данные в мой UDF, они не ведут себя так, как ожидалось. Функция буквально ничего не делает, но вывод выглядит так, как будто она была преобразована:

import zlib
from pyspark.sql.types import StringType

def streamDecompress(val: BinaryType()):
  #return zlib.decompress(val)
  return val

func_udf = udf(lambda x: streamDecompress(x), StringType())

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)

Вот вывод:

[B@49d3f786

Итак, как и ожидалось, происходит сбой при попытке распаковать с помощью zlib.

Кто-нибудь знает, как мне это сделать?

1 Ответ

0 голосов
/ 07 мая 2019

Ну, это было намного проще, чем я думал.Я пытался отобразить байтоподобные данные, ха-ха.

Код ниже решил проблему:

import zlib

def streamDecompress(val):   
  return str(zlib.decompress(val, 15+32))

func_udf = udf(lambda x: streamDecompress(x))

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')

display(df, truncate=False)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...