Я читаю проприетарный двоичный формат (rosbags) с помощью spark, который требует некоторой десериализации. Как только это будет сделано, я получу данные с фиксированным количеством различных схем. Я хочу написать выходные файлы, по одному для каждой из различных схем.
Мне удалось создать фрейм данных для каждой схемы путем фильтрации по типу и десериализации, но все исходные данные читаются повторно.
В следующем примере демонстрируется проблема с использованием json.loads()
для ввода и collect()
для вывода.
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def transform(data):
print("transforming", data)
return json.loads(data[1])
def filter_by_type(data, type_):
print("filtering %s == %s" % (data[0], type_))
return data[0] == type_
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
dd = sc.parallelize([
['type1', '"string1"'],
['type2', '2'],
])
print(spark.createDataFrame(dd.filter(lambda x: filter_by_type(x, "type1")).map(transform), StringType()).collect())
print(spark.createDataFrame(dd.filter(lambda x: filter_by_type(x, "type2")).map(transform), IntegerType()).collect())
выход
filtering type1 == type1
transforming ['type1', '"string1"']
filtering type2 == type1
[Row(value='string1')]
filtering type1 == type2
filtering type2 == type2
transforming ['type2', '2']
[Row(value=2)]