Как автоматизировать чтение Spark? - PullRequest
1 голос
/ 14 июля 2020

Мне нужно 150 раз прочитать из моего ведра S3

df1 = spark.read.json('s3://mybucket/f1')
df2 = spark.read.json('s3://mybucket/f2')
...
df150 = spark.read.json('s3://mybucket/f150')

Как автоматизировать этот процесс?

spark.read.json produces Spark Dataframe.

Если я попробую то, что предложил Оскар

import spark
your_dfs_list = [spark.read.json("s3://cw-mybuc/RECORDS/FULL_RECEIVED/2020/07/01/00"+str(x)) for x in range(1,38)]

AttributeError: module 'spark' has no attribute 'read'

Ответы [ 3 ]

4 голосов
/ 14 июля 2020
• 1000 или foreach позже) на фреймах данных, но это зависит от ваших предпочтений :)
(1 to 150).map(v => spark.read.json(s"s3://mybucket/f$v"))
2 голосов
/ 14 июля 2020

Шаг 1 : Создайте список всех файлов json.gz. В текущих версиях Spark сжатые файлы .gzip читаются автоматически, так что это не проблема. Если вы читаете все файлы в сегменте S3, вы можете использовать следующее решение :

from boto.s3.connection import S3Connection

fs = [] # Emtpy list of files

conn = S3Connection('access-key','secret-access-key')
bucket = conn.get_bucket('bucket')
for key in bucket.list():
    fs.append[key.name.encode('utf-8')]

Шаг 2 : итерации по каждому из файлов из (1) объединения каждого из результирующих фреймов данных по мере продвижения. Версия решения Godza должна помочь:

# Read first file
df = spark.read.json(fs[0]) 

# Then union the rest
for f in fs[1:]:
  df = df.union(spark.read.json(f))
2 голосов
/ 14 июля 2020

Думаю, вам стоит уточнить детали. Как часто вы хотите читать, в чем причина? Et c. Если вы дадите какой-то контекст, мы могли бы помочь лучше?

Судя по вашему фрагменту кода, проще всего было бы сделать al oop. и прочтите его в массиве.

list = []

for i in range(150):
  list.append(spark.read.json('s3://mybucket/f' + (i + 1)))

Однако, если вы предоставите более подробную информацию, я уверен, что этот ответ можно улучшить

Редактировать на основе комментариев

Если вы хотите использовать объединение в DataFrames, проще всего будет импортировать имплициты:

import spark.implicits._

var df = spark.emptyDataFrame

for i in range(150):
  df = df.union(spark.read.json('s3://mybucket/f' + (i + 1))))

Обратите внимание, что это должно работать с Spark 2.x и выше:

https://sparkbyexamples.com/spark/spark-how-to-create-an-empty-dataframe/ https://sparkbyexamples.com/spark/spark-dataframe-union-and-union-all/

...