Фильтрация DynamicFrame с помощью AWS Glue или PySpark - PullRequest
0 голосов
/ 09 мая 2018

В моем каталоге данных клея AWS есть таблица mytable. Эта таблица находится в локальном соединении с базой данных Oracle 'mydb'.

Я бы хотел отфильтровать результирующий DynamicFrame только по тем строкам, в которых столбец X_DATETIME_INSERT (который является отметкой времени) больше определенного времени (в данном случае, «2018-05-07 04:00:00»). После этого я пытаюсь подсчитать количество строк, чтобы убедиться, что их количество низкое (таблица содержит около 40 000 строк, но только несколько строк должны соответствовать критериям фильтра).

Вот мой текущий код:

import boto3
from datetime import datetime
import logging
import os
import pg8000
import pytz
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from base64 import b64decode
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "mytable", transformation_ctx = "datasource0")

# Try Glue native filtering    
filtered_df = Filter.apply(frame = datasource0, f = lambda x: x["X_DATETIME_INSERT"] > '2018-05-07 04:00:00')
filtered_df.count()

Этот код выполняется в течение 20 минут и время ожидания истекло. Я пробовал другие варианты:

df = datasource0.toDF()
df.where(df.X_DATETIME_INSERT > '2018-05-07 04:00:00').collect()

А

df.filter(df["X_DATETIME_INSERT"].gt(lit("'2018-05-07 04:00:00'")))

Которые потерпели неудачу. Что я делаю неправильно? Я опытный в Python, но плохо знаком с Glue и PySpark.

1 Ответ

0 голосов
/ 11 мая 2018

AWS Glue загружает весь набор данных из вашего JDBC-источника в папку temp s3 и затем применяет фильтрацию. Если ваши данные были в s3 вместо Oracle и разделены по некоторым ключам (т. Е. / Год / месяц / день), то вы могли бы использовать функцию предиката pushdown для загрузки подмножества данных:

val partitionPredicate = s"to_date(concat(year, '-', month, '-', day)) BETWEEN '${fromDate}' AND '${toDate}'"

val df = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

К сожалению, это пока не работает для источников данных JDBC.

...