Следующее решение игнорирует мою собственную озабоченность, о которой я упоминал в вопросе
Я могу создать несколько строк и использовать их для создания кадра данных, но
набор данных, который я использую, имеет много полей (более 100) и создает сотни RDS и
тогда застежка-молния кажется неэффективной.
def create_uniform_rdd(nrow, ncol, schema = None):
random_rdd = RandomRDDs()
rdds = []
for each_col in range(ncol):
rdds.append(random_rdd.uniformRDD(sc, nrow).collect())
rdds = list(zip(*rdds))
if schema is None:
schema = StructType([StructField(str(i), FloatType(), False) for i in range(ncol)])
df = sqlContext.createDataFrame(rdds, schema)
return df
Мне пришлось сделать бит zip
, потому что кадры данных Spark ориентированы на строки. Я мог бы переключить ncol
с nrow
в цикле for
, но так как количество строк у меня намного больше, чем число столбцов
EDIT
Добавление сравнения времени метода Илии и моего метода
def create_uniform_rdd_vector(nrow, ncol, schema = None):
data = RandomRDDs.uniformVectorRDD(sc, nrow,ncol).map(lambda a : a.tolist()).toDF()
return data
def create_uniform_rdd(nrow, ncol, schema = None):
random_rdd = RandomRDDs()
rdds = []
for each_col in range(ncol):
rdds.append(random_rdd.uniformRDD(sc, nrow).collect())
rdds = list(zip(*rdds))
if schema is None:
schema = StructType([StructField(str(i), FloatType(), False) for i in range(ncol)])
df = sqlContext.createDataFrame(rdds, schema)
return df
def timer_func(func, niter = 10):
tic = time()
for i in range(1,niter+1):
nrow = i*1000
ncol = i*10
_ = func(nrow, ncol, schema = None)
tac = time()
return tac - tic
niter = 5
create_uniform_rdd_time = timer_func(create_uniform_rdd, niter) # 4.27 secs
create_uniform_rdd_vector_time = timer_func(create_uniform_rdd_vector, niter) # 1.31 secs