Как вы получаете партии строк из Spark, используя pyspark - PullRequest
0 голосов
/ 12 марта 2020

У меня есть Spark RDD с более чем 6 миллиардами строк данных, которые я хочу использовать для обучения модели глубокого обучения с использованием train_on_batch. Я не могу поместить все строки в память, поэтому я хотел бы получить около 10 Кб за раз, чтобы пакетировать по кускам по 64 или 128 (в зависимости от размера модели). В настоящее время я использую rdd.sample (), но я не думаю, что это гарантирует, что я получу все строки. Есть ли лучший способ для разделения данных, чтобы сделать их более управляемыми, чтобы я мог написать функцию генератора для получения пакетов? Мой код ниже:

data_df = spark.read.parquet(PARQUET_FILE)
print(f'RDD Count: {data_df.count()}') # 6B+
data_sample = data_df.sample(True, 0.0000015).take(6400) 
sample_df = data_sample.toPandas()

def get_batch():
  for row in sample_df.itertuples():
    # TODO: put together a batch size of BATCH_SIZE
    yield row

for i in range(10):
    print(next(get_batch()))

Ответы [ 2 ]

1 голос
/ 18 марта 2020

Я не верю, что искра позволяет вам компенсировать или разбивать на страницы ваши данные.

Но вы можете добавить индекс, а затем разбить на страницы, сначала:

    from pyspark.sql.functions import lit
    data_df = spark.read.parquet(PARQUET_FILE)
    count = data_df.count()
    chunk_size = 10000

    # Just adding a column for the ids
    df_new_schema = data_df.withColumn('pres_id', lit(1))

    # Adding the ids to the rdd 
    rdd_with_index = data_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row) + [rowId+1]))

    # Creating a dataframe with index
    df_with_index = spark.createDataFrame(chunk_rdd,schema=df_new_schema.schema)

    # Iterating into the chunks
    for chunk_size in range(0,count+1 ,chunk_size):
        initial_page = page_num*chunk_size
        final_page = initial_page + chunk_size 
        where_query = ('pres_id > {0} and pres_id <= {1}').format(initial_page,final_page)
        chunk_df = df_with_index.where(where_query).toPandas()
        train_on_batch(chunk_df) # <== Your function here        

Это не оптимально он плохо использует искру из-за использования pandas фрейма данных, но решит вашу проблему.

Не забудьте сбросить идентификатор, если это повлияет на вашу функцию.

0 голосов
/ 21 марта 2020

Попробуйте это:

 from pyspark.sql import functions as F
 sample_dict = {}

 # Read the parquet file
 df = spark.read.parquet("parquet file")

 # add the partition_number as a column
 df = df.withColumn('partition_num', F.spark_partition_id())
 df.persist()

 total_partition = [int(row.partition_num) for row in 
 df.select('partition_num').distinct().collect()]

 for each_df in total_partition:
     sample_dict[each_df] = df.where(df.partition_num == each_df) 
...