Как сделать так, чтобы все данные, принадлежащие пользователю, отправлялись в один и тот же файл при использовании spark? - PullRequest
1 голос
/ 07 апреля 2019

У нас есть сценарий использования для подготовки задания запуска, которое будет считывать данные от нескольких провайдеров, содержащие информацию о пользователях, присутствующих в произвольном порядке, и записывать их обратно в файлы в S3. Теперь условие состоит в том, что все данные пользователя должны присутствовать в одном файле. Существует около 1 миллиона уникальных пользователей, и каждый из них имеет около 10 КБ данных, максимум. Мы думали о создании не более 1000 файлов, и пусть каждый файл содержит около 1000 записей пользователей.

Мы используем java dataframe apis для создания задания против spark 2.4.0. Я не могу обернуть голову, что было бы самым логичным способом сделать это? Должен ли я выполнить групповую операцию над идентификатором пользователя, а затем каким-то образом собрать строки, если я не достигну 1000 пользователей, а затем перевернуть (если это вообще возможно) или есть какой-то лучший способ. Любая помощь или намек в правильном направлении очень ценится.

Обновление :

Следуя предложению из ответа, я приступил к следующему фрагменту кода, и все же я видел, как пишется 200 файлов вместо 1000.

Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
    .config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
    .config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();

Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.close();

Но вместо 1000, если я использую 100, я вижу 100 файлов. Затем я перешел по ссылке, которой поделился @Alexandros, и следующий фрагмент кода сгенерировал более 20000 файлов в их отдельных каталогах, а также время выполнения увеличилось до безумия.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

1 Ответ

1 голос
/ 08 апреля 2019

Вы можете использовать функцию перераспределения, а затем объединить.

 Df.repartion(user_id).coalese(1000)

 Df.repartion(user_id,1000)

Первый гарантирует, что пустых разделов не будет, а во втором решении некоторые разделы могут быть пустыми.

См .: Spark SQL - Разница между df.repartition и DataFrameWriter partitionBy?

https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html#coalesce(int)

Обновление:

Чтобы это работало

dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.sql.shuffle.partitions (по умолчанию: 200).Из-за этого он не дает 1000 файлов, но работает на 100 файлов.Чтобы заставить его работать, вам нужно сначала репатриировать до 1000 разделов, что будет аналогично подходу 2.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

Я думаю, что приведенный выше код создаст один миллион файлов или более вместо 1000.

dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);

Будет создано 1000 файлов, но вам нужно будет создать сопоставление между идентификаторами и файлами, читая каждый файл после завершения записи файлов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...