Как передать aws s3 cp в gzip для использования с $ QUERY |утилита psql - PullRequest
0 голосов
/ 05 июня 2018

У меня есть следующая команда

"$QUERY" | psql -h $DB_HOST -p $DB_PORT -U $DB_USERNAME $DB_NAME

Где $QUERY - это команда, которая загружает файлы из корзины, распаковывает их и помещает в базу данных.Это выглядит следующим образом:

COPY my_table
FROM PROGRAM 'readarray -t files <<<"$(aws s3 ls ${BUCKET_PATH} | tr [:space:] "\n")"; for (( n = ${#files[@]} - 1; n >= 0; n--));  do if [[ ${files[$n]} =~ .csv.gz$ ]]; then aws s3 cp ${BUCKET_PATH}${files[$n]} >(gzip -d -c); break; fi done'
WITH DELIMITER ',' CSV

Вот отформатированный код bash:

#!/usr/bin/env bash
raw_files=`aws s3 ls ${BUCKET_PATH} | tr [:space:] "\n"`
readarray -t files <<<"$raw_files"
for (( n = ${#files[@]} - 1; n >= 0; n--));  do
    if [[ ${files[$n]} =~ .csv.gz$ ]];
        then aws s3 cp ${BUCKET_PATH}${files[$n]} >(gzip -d -c);
        break; # for test purposes to be no load all files, jsut one
    fi
done

версия aws-CLI

#: aws --version 
#: aws-cli/1.11.13 Python/3.5.2 Linux/4.13.0-43-generic botocore/1.4.70

Этот скриптработает.Но когда я пытаюсь использовать его с psql, он терпит неудачу, и я не могу понять, почему.

Как я могу это исправить?

Вот скрипт, который загружает данные из корзины s3 и объединяет ихв толстый файл:

#!/usr/bin/env bash
bucket_path=$1
limit_files=$2
target_file_name=$3
echo "Source bucket $bucket_path"
if [ -z $target_file_name ]; then
    target_file_name="fat.csv.gz"
    echo "Default target file $target_file_name"
fi
echo "Total files $(aws s3 ls $bucket_path | wc -l)"
readarray -t files <<<"$(aws s3 ls $bucket_path | tr [:space:] "\n")"
for (( n = ${#files[@]} - 1, i=1; n >= 0; n--));  do
    if [[ ${files[$n]} =~ .csv.gz$ ]]; then
        aws s3 cp --quiet $bucket_path${files[$n]} >(cat >> "$target_file_name");
        echo "$((i++)), ${files[$n]}, current size: $(du -sh $target_file_name)"
        if [ ! -z $limit_files ] && [ $i -gt $limit_files ]; then
            echo "Final size $(du -sh $target_file_name)"
            exit 0
        fi
    fi
done
exit 0

Он работает правильно.

Но когда я пытаюсь передать этот файл fat.csv.gz в psql db, используя следующий код

echo "COPY my_table
FROM PROGRAM 'gzip -d -c fat.csv.gz'
WITH DELIMITER ',' CSV" | psql -h $DB_HOST -p $DB_PORT -U $DB_USERNAME $DB_NAME

Я получаю сообщение об ошибке:

ОШИБКА: должен быть суперпользователем для КОПИРОВАНИЯ в или из файла

Это похоже на специфику работы pg (я думаю, это связано спо соображениям безопасности) - ссылка

Итак, проблема сейчас в том, что я не знаю, как переделать свой скрипт, чтобы он был конвейером fat.csv.gz.Я не могу получить такую ​​привилегию и должен найти обходной путь.

1 Ответ

0 голосов
/ 11 июня 2018

Я наконец написал следующий скрипт bash, который загружает файлы из s3, объединяет их в архивы размером 50 МБ и передает в pg в подпроцессе.Надеюсь, это кому-нибудь пригодится:

get_current_timestamp() (
  date '+%s.%N'
)

execute_sql() (
    write_log "Importing data from s3 to pg..."
    import_data_from_s3 "$EVENTS_PATH"
    write_log "Importing data from s3 to pg...done"
)

columns() (
    local columns=`echo "SELECT array_to_string(
                            array(SELECT column_name::text
                                  FROM information_schema.columns
                                  WHERE table_name ILIKE '${TMP_TABLE}'
                                    AND column_name NOT ILIKE '${DATE_FIELD}'), ',')" | \
                    psql --tuples-only -h $DB_HOST -p $DB_PORT -U $DB_USERNAME $DB_NAME`
    echo -n "${columns}"
)

get_timestamp_difference() (
  FROM=$1
  TO=$2
  echo $FROM $TO | awk '{
    diff = $2-$1
    if (diff >= 86400) {
      printf "%i days ", diff/86400
    }
    if (diff >= 3600) {
      printf "%i hours ", (diff/3600)%24
    }
    if (diff >= 60) {
      printf "%i mins ", (diff/60)%60
    }
    printf "%f secs", diff%60
  }'
)

pretty_size() (
    if [ ! -z $1 ]; then
        local size=$1;
    else
        local size=`cat <&0`;
    fi
    echo "${size}" | \
    awk '{ \
            split( "B KB MB GB" , v ); \
            s=1; \
            while( $1>=1024 ) { \
                $1/=1024; s++ \
            } \
            printf "%.1f%s", $1, v[s] \
        }' | \
    add_missing_eol >&1
)

import_data_from_s3() (
    local bucket_path=$1
    local limit_files=$2
    local target_file_name=$3

    write_log "Source bucket $bucket_path"
    if [ -z ${target_file_name} ]; then
        target_file_name="fat.csv.gz"
        write_log "Default target file $target_file_name"
    fi

    if [ ! -z ${limit_files} ]; then
        write_log "Import ${limit_files} files"
    else
        write_log "Import all files"
    fi

    write_log "Total files $(aws s3 ls $bucket_path | wc -l)"
    readarray -t files <<<"$(aws s3 ls $bucket_path | tr [:space:] "\n")"

    write_log "Remove old data files..."
    find . -maxdepth 1 -type f -name "*${target_file_name}" -execdir rm -f {} +;
    write_log "Remove old data files...done"

    TMP_TABLE_COLUMNS=$(columns)
    write_log "Importing columns: ${DW_EVENTS_TMP_TABLE_COLUMNS}"

    declare -A pids
    local total_data_amount=0
    local file_size_bytes=0
    local file_size_bytes=0
    local size_limit=$((50*1024*1024))

    for (( n = ${#files[@]} - 1, file_counter=1, fat_file_counter=1; n >= 0; n--));  do
        if [[ ! ${files[$n]} =~ .csv.gz$ ]]; then continue; fi

        file="${fat_file_counter}-${target_file_name}"
        aws s3 cp --quiet ${bucket_path}${files[$n]} >(cat >> "${file}");

        file_size_bytes=$(stat -c%s "$file")
        if [ $file_size_bytes -gt $size_limit ]; then
            import_zip "${file}" "$(pretty_size ${file_size_bytes})" & pids["${file}"]=$!;
            total_data_amount=$((total_data_amount+file_size_bytes))
            write_log "Files read: ${file_counter}, total size(zipped): $(pretty_size ${total_data_amount})"
            ((fat_file_counter++))
        fi

        # write_log "${file_counter}, ${files[$n]}, current size: $(du -sh $file)"
        if [ ! -z ${limit_files} ] && [ ${file_counter} -gt ${limit_files} ]; then
            write_log "Final size $(du -sh ${file})"
            if [ ! ${pids["${file}"]+0} ]; then
                import_zip "${file}" "$(pretty_size ${file_size_bytes})" & pids["${file}"]=$!;
            fi
            break;
        fi
        ((file_counter++))
    done

    # import rest file that can less than limit size
    if [ ! ${pids["${file}"]+0} ]; then
        import_zip "${file}" "$(pretty_size ${file_size_bytes})" & pids["${file}"]=$!;
    fi

    write_log "Waiting for all pids: ${pids[*]}"
    for pid in ${pids[*]}; do
        wait $pid
    done
    write_log "All sub process have finished. Total size(zipped): $(pretty_size ${total_data_amount})"
)

import_zip() (
    local file=$1
    local size=$2
    local start_time=`get_current_timestamp`
    write_log "pid: $!, size: ${size}, importing ${file}...";
    gzip -d -c ${file} | \
       psql --quiet -h ${DB_HOST} -p ${DB_PORT} -U ${DB_USERNAME} ${DB_NAME} \
           -c "COPY ${TMP_TABLE}(${TMP_TABLE_COLUMNS})
               FROM STDIN
               WITH DELIMITER ',' CSV";
    rm $file;
    local end_time=`get_current_timestamp`
    write_log "pid: $!, time: `get_timestamp_difference ${start_time} ${end_time}`, size: ${size}, importing ${file}...done";
)
...