У нас есть сценарий использования для подготовки задания запуска, которое будет считывать данные от нескольких провайдеров, содержащие информацию о пользователях, присутствующих в произвольном порядке, и записывать их обратно в файлы в S3. Теперь условие состоит в том, что все данные пользователя должны присутствовать в одном файле. Существует около 1 миллиона уникальных пользователей, и каждый из них имеет около 10 КБ данных, максимум. Мы думали о создании не более 1000 файлов, и пусть каждый файл содержит около 1000 записей пользователей.
Мы используем java dataframe apis для создания задания против spark 2.4.0. Я не могу обернуть голову, что было бы самым логичным способом сделать это? Должен ли я выполнить групповую операцию над идентификатором пользователя, а затем каким-то образом собрать строки, если я не достигну 1000 пользователей, а затем перевернуть (если это вообще возможно) или есть какой-то лучший способ. Любая помощь или намек в правильном направлении очень ценится.
Обновление :
Следуя предложению из ответа, я приступил к следующему фрагменту кода, и все же я видел, как пишется 200 файлов вместо 1000.
Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
.config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
.config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();
Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);
spark.close();
Но вместо 1000, если я использую 100, я вижу 100 файлов. Затем я перешел по ссылке, которой поделился @Alexandros, и следующий фрагмент кода сгенерировал более 20000 файлов в их отдельных каталогах, а также время выполнения увеличилось до безумия.
dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);