Spark SQL (Java) - Недорогой способ объединения X файлов? - PullRequest
0 голосов
/ 26 января 2019

В настоящее время я работаю над проектом, в котором я читаю 19 различных файлов паркета и присоединяюсь к ID.У некоторых из этих файлов есть несколько строк для каждого потребителя, у некоторых их нет.

У меня есть файл ключа, в котором есть 1 столбец, к которому я присоединяюсь, и другой (userName), который мне нужен, и мне нужны все столбцыдругие файлы.

Я создаю отдельную программу чтения для каждого файла паркета, которая читает файл и преобразует его в набор искровых данных со структурой, подобной этой:

GenericStructure1 record;
int id;

Затем я присоединяюсь ко всем этимсозданные наборы данных (представьте себе все 19):

keyDataset.join(dataSet1, dataSet1.col("id").equalTo(keyDataset.col("id")), "left_outer")
.join(dataSet19, dataSet19.col("id").equalTo(keyDataset.col("id")), "left_outer")
.groupBy(keyDataset.col("id"), keyDataset.col("userName"))
.agg(
    collect_set(dataSet1.col("record")).as("set1"),
    collect_set(dataSet19.col("record")).as("set19")
.select(
    keyDataset.col("id"),
    keyDataset.col("userName"),
    col("set1"),
    col("set19")
)
.as(Encoders.bean(Set.class));

, где Set.class выглядит примерно так:

public class Set implements Serializable {
    long id;
    String userName;
    List<GenericStructure1> set1;
    List<GenericStructure19> set19;
}

Это прекрасно работает для 100 записей, но когда я пытаюсь увеличитьдо одной части 5-миллиметрового файла паркета (что-то вроде 75K записей), он взбивает и прожигает память до тех пор, пока, в конце концов, не иссякнет.В производстве мне нужно, чтобы это работало на миллионах, поэтому факт, что он задыхается на 75K, является реальной проблемой.Единственное, я не вижу простого способа оптимизировать его, чтобы он мог справиться с такой нагрузкой.Кто-нибудь знает недорогой способ объединения большого количества данных, как показано выше?

1 Ответ

0 голосов
/ 27 января 2019

Я смог заставить его работать. В этом вопросе я упоминаю набор данных keyDataset, в котором есть все возможные ключи во всех различных наборах данных. Вместо того, чтобы пытаться соединить это со всеми другими файлами прямо из шлюза, я вместо этого транслирую keyDataset и присоединяюсь к этому после создания общего кадра данных для каждого набора данных.

Dataset<Row> set1RowDataset = set1Dataset
        .groupBy(keyDataset.col(joinColumn))
        .agg(collect_set(set1Dataset.col("record")).as("set1s"))
        .select(
                keyDataset.col("id"),
                col("set1"));

Как только я создаю 19 из них, я присоединяю универсальные наборы данных в их собственное объединение следующим образом:

broadcast(set1RowDataset)
        .join(set2RowDataset, "id")
        .join(set3RowDataset, "id")
        .join(set4RowDataset, "id")
        .join(set19RowDataset, "id")
        .as(Encoders.bean(Set.class));

С точки зрения производительности, я не уверен, какой удар я получаю, выполняя groupBy отдельно от объединения, но моя память остается нетронутой, и Spark больше не выливается так плохо на диск во время перемешивания. Я был в состоянии выполнить это на одной части локально, которая терпела неудачу прежде, как я упоминал выше. Я еще не пробовал это на кластере с полным файлом паркета, но это мой следующий шаг.

Я использовал это в качестве примера: Пример трансляции

...