Добавление данных в пустой фрейм данных - PullRequest
0 голосов
/ 03 мая 2018

Я создаю пустой фрейм данных и позже пытаюсь добавить к нему другой фрейм данных. На самом деле я хочу динамически добавлять множество фреймов данных в первоначально пустой фрейм данных, в зависимости от количества поступающих RDD.

Функция union () работает нормально, если я присваиваю значение другому третьему фрейму данных.

val df3=df1.union(df2)

Но я хочу продолжать добавлять к исходному фрейму данных (пустому), который я создал, потому что я хочу хранить все RDD в одном фрейме данных. Однако приведенный ниже код не показывает правильные значения. Похоже, он просто не дописал

df1.union(df2)

df1.count() // this shows 0 although df2 has some data and that is shown if I assign to third datafram. 

Если я сделаю следующее (я получаю ошибку переназначения, так как df1 равен val. И если я изменяю его на тип var, я получаю многопоточность kafka, не безопасную ошибку.

df1=d1.union(df2) 

Есть идеи, как добавить все динамически созданные кадры данных в один изначально созданный фрейм данных?

Ответы [ 2 ]

0 голосов
/ 04 декабря 2018

Не уверен, что это то, что вы ищете!

# Import pyspark functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType 

# Define your schema
field = [StructField("Col1",StringType(), True), StructField("Col2", DoubleType(), True)]
schema = StructType(field)

# Your empty data frame
df = spark.createDataFrame(sc.emptyRDD(), schema)

l = []

for i in range(5):
# Build and append to the list dynamically
l = l + [([str(i), i])]

# Create a temporary data frame similar to your original schema
temp_df = spark.createDataFrame(l, schema)

# Do the union with the original data frame
df = df.union(temp_df)
0 голосов
/ 03 мая 2018

DataFrames и другие распределенные структуры данных являются неизменяемыми, поэтому методы, которые работают с ними, всегда возвращают новый объект. Нет добавления, нет изменений и нет ALTER TABLE эквивалента.

И если я изменяю его на var type, я получаю многопоточность kafka, небезопасную ошибку.

Без реального кода невозможно дать вам однозначный ответ, но вряд ли он связан с union кодом.

Существует ряд известных ошибок Spark, вызванных неправильной внутренней реализацией ( SPARK-19185 , SPARK-23623 для перечисления всего нескольких).

...