Функция avro deserialize ожидает байты в списке и завершается ошибкой при применении к фрейму данных.Работает только с collect (), но драйверу / мастеру не хватает памяти
При использовании версии 2.3.3 spark с python 3.6.8 создается информационный фрейм из таблицы выбора Hive, в которой есть сообщение avro-сериализации.Затем я использую https://github.com/Landoop/python-serializers.git, поскольку он поддерживает авросериализацию с использованием реестра сливающейся схемы на python 3.x
Попытался применить функцию десериализации на фрейме данных, и это не удалось.Работает, только если я df.collect () и использую цикл for для десериализации каждой записи, или работает, если я конвертирую df в rdd.map и десериализовываю каждую строку.Оба эти случая работают только в тестовом режиме, в том числе OOM или вечный запуск на данных куста объемом 10 ГБ, работающих на 5-узловых 30-ГБ серверах 8cpu.
spark = SparkSession \
.builder \
....
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM table1")
unbase_df = df.select(unbase64(df.mycolumn1))
client = SchemaRegistryClient(url='1.2.3.4:1234')
serializer = MessageSerializer(client)
##attempt 1##FAILS##
decode_df = df.select(serializer.decode_message('mycolumn1'))
###->ERROR ->
##attempt 2##FAILS##
decode_df_2 = df.select(serializer.decode_message(b'mycolumn1'))
##attempt 3##WORKS BUT OOM with high volume on master(drivermanager)##
unbase_collect = unbase_df.collect()
decode_list = [serializer.decode_message(msg.mycolumn1) for msg in unbase_collect]
##attempt 4##WORKS BUT RUNS FOR EVER##
def avrodecoder(row):
decoded_row = serializer.decode_message(row['mycolumn1'])
return decoded_row
decode_rdd = unbase_df.select("*").rdd.map(avrodecoder)
## After #3 or #4 works I convert back to dataframe with schema
schema = StructType([
StructField("1column", StructType([
.......
StructField("ncolumn", StringType()])
decode_df = spark.createDataFrame(decode_rdd,schema)
Сообщение об ошибке в случае #attempt 1
in decode_message(self, message)
185 raise SerializerError("message is too small to decode")
186
--> 187 with ContextBytesIO(message) as payload:
188 magic, schema_id = struct.unpack('>bI', payload.read(5))
189 if magic != MAGIC_BYTE:
TypeError: a bytes-like object is required, not 'str'```
Error message in case of #attempt 2
```.....python3.6/site-packages/datamountaineer/schemaregistry/serializers/MessageSerializer.py
in decode_message(self, message)
188 magic, schema_id = struct.unpack('>bI', payload.read(5))
189 if magic != MAGIC_BYTE:
--> 190 raise SerializerError("message does not start with magic byte")
191 decoder_func = self._get_decoder_func(schema_id, payload)
192 return decoder_func(payload)
SerializerError: the message does not start with a magic byte ```
- Как я могу выполнить автовысериализацию через реестр слитых схем непосредственно на фрейме данных
- Как я могу убедиться, что все преобразования выполняются только для рабочих / исполнителей
- Как я могу заставить его работать достаточно эффективно, чтобы он не работал в режиме OOM или не работал 5,6+ часов для <10 ГБ данных </li>
- Не понимаю логики, лежащей в основе графика 'Yarn Pending Memory'что в обоих рабочих случаях он поднимается до 7 + ТБ или даже выше