Оптимизация и пакетное соединение паркета / JDBC - PullRequest
0 голосов
/ 04 октября 2018

Я выполняю операцию соединения из данных паркета S3 в таблицу JDBC (Postgres), используя столбец в данных паркета для первичного ключа таблицы JDBC.Мне нужна небольшая часть (но все же большое количество - десятки или сотни тысяч строк в целом) из таблицы JDBC, а затем мне нужно интеллектуально разделить данные для использования в исполнителях.

I 'Я все еще новичок в области разработки данных в целом и Spark в частности, так что простите (и предположите!) мое невежество.Меня меньше волнует время обработки, чем использование памяти;Я должен приспособить использование памяти к пределам Amazon Glue.

Какой хороший способ сделать это?

Мои существующие мысли:

Я мог бы, теоретически,построить запрос SQL как:

select * from t1 where id = key1 UNION
select * from t1 where id = key2 UNION...

Но это кажется глупым.Этот вопрос: Выбор нескольких строк по идентификатору, есть ли более быстрый способ, чем WHERE IN , дает мне идею записать ключи, которые я хочу получить, во временную таблицу, соединить их с исходной таблицей и вытянутьрезультат;который выглядит как «правильный» способ сделать выше.Но это также может показаться достаточно распространенной проблемой, поскольку есть готовое решение, которое я еще не нашел.

Существует также возможность переключения между минимальными и максимальными значениями UUID, но затемвопрос заключается в том, сколько дополнительных строк я извлекаю, и поскольку UUID, AFAIK, случайным образом распределены по возможным значениям UUID, я ожидаю, что получится много дополнительных строк (строк, которые будут опущены при объединении).Тем не менее, это может быть полезным способом разделения данных JDBC.

Мне также до сих пор неясно, как данные JDBC попадают к исполнителям;что он, возможно, проходит (полностью) через процесс драйвера.

Итак, чтобы попытаться формализовать это на вопросы:

  1. Существует ли существующий рецепт для такого использования?
  2. Какие функции Spark я должен рассмотреть для достижения этой цели?
  3. Каков фактический поток данных Spark для данных, поступающих из соединения JDBC?

Ответы [ 2 ]

0 голосов
/ 18 декабря 2018

Spark не предназначен для выполнения каких-либо эффективных действий с драйвером, лучше его избегать.

В вашем случае я бы рекомендовал сначала загрузить данные из S3 в какой-нибудь DF.Сохраните этот фрейм данных, так как он понадобится позже.

Затем вы можете разрешить уникальные значения для ваших ключей из S3, используя комбинацию map (row ->). Diver ()

Затем разделите над ключами, имеющими разумное количество ключей в каждом разделе для выполнения одного запроса к JDBC.Вы также можете сохранить приведенный выше результат и выполнить операцию count () , а затем repartition () .Например, если в одном разделе не более 1000 элементов.

Затем, используя mapPartitions , составьте запрос, подобный «SELECT * FROM table WHERE key in».

Затем с помощью spark flatMap необходимо выполнить фактический выбор.Я не знаю автоматического способа для этого с фреймами данных, поэтому, вероятно, вам нужно будет использовать JDBC напрямую для выполнения выбора и сопоставления данных.Вы можете инициализировать среду Spring на рабочей машине и использовать расширения данных Spring для простой загрузки данных из БД в список некоторых сущностей.

Теперь у вас есть DatasSet с необходимыми данными из Postgres в кластере.Из него можно создать фрейм данных с помощью toDF () .Вероятно, потребуется дополнительное сопоставление для столбцов здесь или для сопоставления данных с типом Row на предыдущем шаге.

Итак, теперь у вас есть 2 обязательных фрейма данных, один из них сохраняется исходным с данными изS3 и другие с данными из Postgres, вы можете присоединиться к ним стандартным способом, используя Dataframe.join .

Примечание: Не забудьте сохранить сохраняющиеся наборы данных и кадры, используя .persist () , когда они будут использоваться повторно.В противном случае он будет повторять все шаги для извлечения данных каждый раз.

0 голосов
/ 26 октября 2018

Кажется, что наилучший из возможных способов сделать это (пока что) - записать идентификаторы строк, которые вы хотите извлечь, во временную таблицу в БД, выполнить объединение с основной таблицей, а затем прочитать результат.(как описано в связанном ответе).

Теоретически, это вполне выполнимо в Spark;что-то вроде

// PSUEDOCODE!
df.select("row_id").write.jdbc(<target db>, "ids_to_fetch")
databaseConnection.execute("create table output from (select * from ids_to_fetch join target_table on row_id = id)")
df = df.join(
  spark.read.jdbc(<target db>, "output")
)

Это, вероятно, самый эффективный способ сделать это, потому что (AFAIK) он будет обрабатывать как запись идентификаторов , так и чтение таблицы соединения висполнители, вместо того, чтобы пытаться что-то делать в драйвере.

Однако сейчас я не могу записать временную таблицу в целевую базу данных, поэтому я создаю серию операторов select where in вводитель, а затем потянув результаты тех.

...