Предикат нажатия на клей AWS не работает должным образом - PullRequest
2 голосов
/ 10 мая 2019

Я пытаюсь оптимизировать работу Glue / PySpark с помощью предикатов push down.

start = date(2019, 2, 13) 
end = date(2019, 2, 27) 
print(">>> Generate data frame for ", start, " to ", end, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=end)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s'
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

Однако, похоже, что Glue все еще пытается прочитать данные за пределами указанного диапазона дат?

INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-01/part-00045-6cdebbb1-562c-43fa-915d-93b125aeee61.c000.snappy.parquet' for reading
INFO FileScanRDD: Reading File path: s3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet, range: 0-11797922, partition values: [12191,17965]
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet' for reading
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.

Обратите внимание на querydatetime=2019-03-01 и querydatetime=2019-03-10 вне указанного диапазона 2019-02-13 - 2019-02-27.Вот почему есть следующая строка "прерывание HTTP-соединения"?Далее говорится: «Это, вероятно, ошибка и может привести к неоптимальному поведению», что-то не так?

Интересно, проблема в том, что он не поддерживает МЕЖДУ предикатом или IN?


Таблица создания DDL

CREATE EXTERNAL TABLE `flights`(
  `id` string, 
  `querytaskid` string, 
  `queryoriginplace` string, 
  `queryoutbounddate` string, 
  `queryinbounddate` string, 
  `querycabinclass` string, 
  `querycurrency` string, 
  `agent` string, 
  `quoteageinminutes` string, 
  `price` string, 
  `outboundlegid` string, 
  `inboundlegid` string, 
  `outdeparture` string, 
  `outarrival` string, 
  `outduration` string, 
  `outjourneymode` string, 
  `outstops` string, 
  `outcarriers` string, 
  `outoperatingcarriers` string, 
  `numberoutstops` string, 
  `numberoutcarriers` string, 
  `numberoutoperatingcarriers` string, 
  `indeparture` string, 
  `inarrival` string, 
  `induration` string, 
  `injourneymode` string, 
  `instops` string, 
  `incarriers` string, 
  `inoperatingcarriers` string, 
  `numberinstops` string, 
  `numberincarriers` string, 
  `numberinoperatingcarriers` string)
PARTITIONED BY ( 
  `querydestinationplace` string, 
  `querydatetime` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://pinfare-glue/flights/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='pinfare-parquet', 
  'averageRecordSize'='19', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='623609', 
  'recordCount'='4368434222', 
  'sizeKey'='86509997099', 
  'typeOfData'='file')

Ответы [ 4 ]

3 голосов
/ 14 мая 2019

Чтобы сдвинуть ваше условие, вам нужно изменить порядок столбцов в вашем разделе с помощью предложения определения таблицы

Условие, имеющее предикат "in" в первом столбце раздела, не может быть передано какВы ожидаете.

Позвольте мне, если это поможет.

3 голосов
/ 13 мая 2019

Одна из проблем, которые я вижу в коде, заключается в том, что вы используете «сегодня» вместо «конца» в предложении между. Хотя я не вижу переменную today, объявленную где-либо в вашем коде, я предполагаю, что она была инициализирована с сегодняшней датой.

В этом случае диапазон будет другим, и разделы, считываемые клеевой искрой, будут правильными.

1 голос
/ 19 мая 2019

Предикаты Pushdown в Glue DynamicFrame отлично работает с между и IN .

Пока у вас есть правильная последовательность столбцов разделов, определенных в определении таблицы и в запросе.

У меня есть таблица с тремя уровнями перегородок.

s3://bucket/flights/year=2018/month=01/day=01 -> 50 records
s3://bucket/flights/year=2018/month=02/day=02 -> 40 records
s3://bucket/flights/year=2018/month=03/day=03 -> 30 records

Чтение данных в динамическом фрейме

ds = glueContext.create_dynamic_frame.from_catalog(
    database = "abc",table_name = "pqr", transformation_ctx = "flights",
    push_down_predicate = "(year == '2018' and month between '02' and '03' and day in ('03'))"
    )
ds.count() 

Выход:

30 records

Итак, вы получите правильные результаты, если последовательность столбцов задана правильно. Также обратите внимание, что вам нужно указать '(цитата) IN('%s') в предложении IN.

Столбцы разделов в таблице:

querydestinationplace string, 
querydatetime string

Данные считываются в DynamicFrame:

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", 
    push_down_predicate=
    """querydestinationplace IN ('%s') AND 
       querydatetime BETWEEN '%s' AND '%s' 
    """ 
    % 
    ( ",".join(map(lambda s: str(s), arr)), 
        start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d")))
0 голосов
/ 13 мая 2019

Попробуйте сделать конец, как это

start = str(date(2019, 2, 13))
end = str(date(2019, 2, 27)) 
# Set your push_down_predicate variable

pd_predicate = "querydatetime >= '" + start + "' and querydatetime < '" + end + "'"
#pd_predicate = "querydatetime between '" + start + "' AND '" + end + "'" # Or this one?

flightsGDF = glueContext.create_dynamic_frame.from_catalog(
    database = "xxx"
    , table_name = "flights"
    , transformation_ctx="flights"
    , push_down_predicate=pd_predicate)

pd_predicate будет строкой, которая будет работать как push_down_predicate.

Вот хорошее прочтение, если вам нравится.

https://aws.amazon.com/blogs/big-data/work-with-partitioned-data-in-aws-glue/

...