Оптимальный способ сохранить Spark SQL DataFrame в S3, используя информацию, хранящуюся в них - PullRequest
0 голосов
/ 02 января 2019

У меня есть фреймы данных с такими данными, как:

        channel  eventId1               eventId2               eventTs  eventTs2  serialNumber  someCode
        Web-DTB akefTEdZhXt8EqzLKXNt1Wjg    akTEdZhXt8EqzLKXNt1Wjg  1545502751154   1545502766731   4   rfs
        Web-DTB 3ycLHHrbEkBJ.piYNyI7u55w    3ycLHHEkBJ.piYNyI7u55w  1545502766247   1545502767800   4   njs
        Web-DTB 3ycL4rHHEkBJ.piYNyI7u55w    3ycLHHEkBJ.piYNyI7u55w  1545502766247   1545502767800   4   null

Мне нужно сохранить эти данные в путь S3 в виде:

  s3://test/data/ABC/hb/eventTs/[eventTs]/uploadTime_[eventTs2]/*.json.gz

Как я могу продолжить это, так как мне нужноизвлекать данные из разделов для записи в путь S3: (путь s3 является функцией eventTs и eventTs2, присутствующих в фреймах данных)

df.write.partitionBy("eventTs","eventTs2").format("json").save("s3://test/data/ABC/hb????")

Я полагаю, что могу перебрать каждую строку в фрейме данных, извлечьПуть и сохранить в S3, но не хотите этого делать.

Есть ли способ сгруппировать по фреймам данных на eventTs и eventTs2, а затем сохранить фреймы данных в полный S3 путь ?Есть ли что-то более оптимальное?

1 Ответ

0 голосов
/ 02 января 2019

Spark поддерживает разделы, как у нас в Hive. Если число отдельных элементов для eventTs, eventTs2 меньше, разделение будет хорошим способом решения этой проблемы.

Проверьте scala doc для получения дополнительной информации о partitionBy.

Пример использования:

val someDF = Seq((1, "bat", "marvel"), (2, "mouse", "disney"), (3, "horse", "animal"), (1, "batman", "marvel"), (2, "tom", "disney") ).toDF("id", "name", "place")
someDF.write.partitionBy("id", "name").orc("/tmp/somedf")

Если вы напишите фрейм данных с paritionBy для «id» и «name», будет создана следующая структура каталогов.

/tmp/somedf/id=1/name=bat
/tmp/somedf/id=1/name=batman

/tmp/somedf/id=2/name=mouse
/tmp/somedf/id=2/name=tom

/tmp/somedf/id=3/name=horse

Первый и второй разделы становятся каталогами, и все строки, где id равен 1, а name is bat, будут сохранены в структуре каталогов /tmp/somedf/id=1/name=bat, порядок разделов, определенный в partitionBy, определяет порядок каталогов.

В вашем случае разделы будут на eventTs и eventTS2.

val someDF = Seq(
        ("Web-DTB","akefTEdZhXt8EqzLKXNt1Wjg","akTEdZhXt8EqzLKXNt1Wjg","1545502751154","1545502766731",4,"rfs"),
        ("Web-DTB","3ycLHHrbEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"njs"),
        ("Web-DTB","3ycL4rHHEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"null"))
    .toDF("channel" , "eventId1", "eventId2", "eventTs",  "eventTs2",  "serialNumber",  "someCode")
someDF.write("eventTs", "eventTs2").orc("/tmp/someDF")

Создание структуры каталогов следующим образом.

/tmp/someDF/eventTs=1545502766247/eventTs2=1545502767800
/tmp/someDF/eventTs=1545502751154/eventTs2=1545502766731
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...