Добавить раздел на склеенную таблицу через API на AWS? - PullRequest
0 голосов
/ 01 июня 2018

У меня есть корзина S3, которая постоянно заполняется новыми данными, я использую Athena и Glue для запроса этих данных, дело в том, что если glue не знает, что создан новый раздел, он не ищет егонужно искать там.Если я выполняю вызов API для запуска сканера Glue каждый раз, когда мне нужен новый раздел, он слишком дорогой, поэтому лучшее решение для этого - сообщить glue, что добавлен новый раздел, т.е. создать новый раздел в его таблице свойств.Я просмотрел документацию AWS, но не повезло, я использую Java с AWS.Любая помощь?

Ответы [ 3 ]

0 голосов
/ 08 сентября 2018

Вы можете использовать batch_create_partition () glue api для регистрации новых разделов.Это не требует каких-либо дорогостоящих операций, таких как MSCK REPAIR TABLE или повторного сканирования.

У меня был похожий сценарий использования, для которого я написал скрипт на python, который выполняет следующее -

Шаг 1 - Извлечение информации из таблицы и анализ необходимой информации из нее, которая требуетсязарегистрировать разделы.

# Fetching table information from glue catalog
    logger.info("Fetching table info for {}.{}".format(l_database, l_table))
    try:
        response = l_client.get_table(
            CatalogId=l_catalog_id,
            DatabaseName=l_database,
            Name=l_table
        )
    except Exception as error:
        logger.error("Exception while fetching table info for {}.{} - {}"
                     .format(l_database, l_table, error))
        sys.exit(-1)

    # Parsing table info required to create partitions from table
    input_format = response['Table']['StorageDescriptor']['InputFormat']
    output_format = response['Table']['StorageDescriptor']['OutputFormat']
    table_location = response['Table']['StorageDescriptor']['Location']
    serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
    partition_keys = response['Table']['PartitionKeys']

Шаг 2 - Создать словарь списков, где каждый список содержит информацию для создания отдельного раздела.Все списки будут иметь одинаковую структуру, но значения их разделов будут меняться (год, месяц, день, час)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))
    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list

Шаг 3 - Вызов API batch_create_partition ()

        for each_input in break_list_into_chunks(partition_input_list, 100):
        create_partition_response = client.batch_create_partition(
            CatalogId=catalog_id,
            DatabaseName=l_database,
            TableName=l_table,
            PartitionInputList=each_input
        )

В одном вызове API существует ограничение в 100 разделов, поэтому, если вы создаете более 100 разделов, вам нужно разбить список на куски и выполнить итерации по нему.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition

0 голосов
/ 23 января 2019

Этот вопрос старый, но я хотел сказать, что кто-то может иметь s3:ObjectCreated:Put уведомления, запускающие функцию Lambda, которая регистрирует новые разделы, когда данные поступают на S3.Я бы даже расширил эту функцию для обработки устаревших на основе удалений объектов и так далее.Вот сообщение в блоге AWS, в котором подробно описываются уведомления о событиях S3: https://aws.amazon.com/blogs/aws/s3-event-notification/

0 голосов
/ 01 июня 2018
  1. Вы можете настроить запуск своего каталога клея каждые 5 минут
  2. Вы можете создать лямбда-функцию, которая будет либо запускаться по расписанию, либо запускаться по событию изВаше ведро (например, событие putObject) и эта функция могут вызывать athena для обнаружения разделов :

    import boto3
    
    athena = boto3.client('athena')
    
    def lambda_handler(event, context):
        athena.start_query_execution(
            QueryString = "MSCK REPAIR TABLE mytable",
            ResultConfiguration = {
                'OutputLocation': "s3://some-bucket/_athena_results"
            }
    
  3. Используйте Athena для добавления разделов вручную.Вы также можете запускать sql запросы через API, как в моем лямбда-примере.

    Пример из Руководство Афины :

    ALTER TABLE orders ADD
      PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
      PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...