Python Пул потоков быстрее, чем Go Подпрограммы при сканировании AWS S3? - PullRequest
0 голосов
/ 28 мая 2020

Недавно я начал разбираться в Golang параллелизме, в частности, об использовании каналов и пулов рабочих. Я хотел сравнить производительность между Go и Python (как это делали многие), потому что я в основном читал, что Go превосходит Python в отношении параллелизма. Поэтому я написал две программы для сканирования сегментов S3 учетной записи AWS и получения отчета об общем размере. Я выполнил это для учетной записи, в которой было более 75 корзин, на общую сумму более нескольких ТБ данных.

Я был удивлен, обнаружив, что моя реализация Python была почти в 2 раза быстрее, чем моя реализация Go. Это сбивает меня с толку, судя по всем тестам и литературе, которую я прочитал. Это наводит меня на мысль, что я неправильно реализовал свой код Go. Наблюдая за запуском обеих программ, я заметил, что реализация Go использует только до 15% моего процессора, а Python использует> 85%. Я пропускаю важный шаг с Go или я что-то упускаю в своей реализации? Заранее спасибо!

Python Код:

'''
Get the size of all objects in all buckets in S3
'''
import os
import sys
import boto3
import concurrent.futures

def get_s3_bucket_sizes(aws_access_key_id, aws_secret_access_key, aws_session_token=None):

    s3client = boto3.client('s3')

    # Create the dictionary which will be indexed by the bucket's
    # name and has an S3Bucket object as its contents
    buckets = {}

    total_size = 0.0

    #
    # Start gathering data...
    #

    # Get all of the buckets in the account
    _buckets = s3client.list_buckets()

    cnt = 1
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        future_bucket_to_scan = {executor.submit(get_bucket_objects, s3client, bucket): bucket for bucket in _buckets["Buckets"]}

        for future in concurrent.futures.as_completed(future_bucket_to_scan):
            bucket_object = future_bucket_to_scan[future]

            try:
                ret = future.result()
            except Exception as exc:
                print('ERROR: %s' % (str(exc)))
            else:
                total_size += ret

    print(total_size)

def get_bucket_objects(s3client, bucket):

    name = bucket["Name"]

    # Get all of the objects in the bucket
    lsbuckets = s3client.list_objects(Bucket=name)

    size = 0
    while True:
        if "Contents" not in lsbuckets.keys():
            break

        for content in lsbuckets["Contents"]:            
            size += content["Size"]

        break

    return size

#
# Main
#
if __name__=='__main__':
    get_s3_bucket_sizes(os.environ.get("AWS_ACCESS_KEY_ID"), os.environ.get("AWS_SECRET_ACCESS_KEY"))

Go Код:

package main

import (
    "fmt"
    "sync"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/awserr"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
)

type S3_Bucket_Response struct {
    bucket string
    count  int64
    size   int64
    err    error
}

type S3_Bucket_Request struct {
    bucket string
    region string
}

func get_bucket_objects_async(wg *sync.WaitGroup, requests chan S3_Bucket_Request, responses chan S3_Bucket_Response) {

    var size  int64
    var count int64

    for request := range requests {
        bucket := request.bucket
        region := request.region

        // Create a new response
        response := new(S3_Bucket_Response)
        response.bucket = bucket

        sess, err := session.NewSession(&aws.Config{
            Region: aws.String(region), 
        })

        s3conn := s3.New(sess)

        resp, err := s3conn.ListObjectsV2(&s3.ListObjectsV2Input{
            Bucket: aws.String(bucket),
        })

        if err != nil {
            if awsErr, ok := err.(awserr.Error); ok {

                switch awsErr.Code() {
                case "NoSuchBucket":
                    response.err = fmt.Errorf("Bucket: (%s) is NoSuchBucket.  Must be in process of deleting.", bucket)
                case "AccessDenied":
                    response.err = fmt.Errorf("Bucket: (%s) is AccessDenied.  You should really be running this with full Admin Privaleges", bucket)
                }
            } else {
                response.err = fmt.Errorf("Listing Objects Unhandled Error: %s ", err)
            }

            responses <- *response
            continue
        } 

        contents := resp.Contents
        size      = 0
        count     = 0

        for i:=0; i<len(contents); i++ {
            size  += *contents[i].Size
            count += 1
        }

        response.size  = size
        response.count = count

        responses <- *response
    }

    wg.Done()
}

func main() {

    var err  error
    var size int64
    var resp *s3.ListBucketsOutput
    var wg sync.WaitGroup

    sess, _ := session.NewSession()
    s3conn  := s3.New(sess)

    // Get account bucket listing
    if resp, err = s3conn.ListBuckets(&s3.ListBucketsInput{});err != nil {
        fmt.Println("Error listing buckets: %s", err)
        return 
    }

    buckets := resp.Buckets
    size = 0

    // Create the buffered channels
    requests  := make(chan S3_Bucket_Request , len(buckets))
    responses := make(chan S3_Bucket_Response, len(buckets))

    for i := range buckets {

        bucket := *buckets[i].Name

        resp2, err := s3conn.GetBucketLocation(&s3.GetBucketLocationInput{                                                           
            Bucket: aws.String(bucket),                                                                                                       
        })         

        if err != nil {
            fmt.Printf("Could not get bucket location for bucket (%s): %s", bucket, err)
            continue
        }

        wg.Add(1)
        go get_bucket_objects_async(&wg, requests, responses)

        region := "us-east-1"
        if resp2.LocationConstraint != nil {
            region = *resp2.LocationConstraint
        }

        request := new(S3_Bucket_Request)
        request.bucket = bucket
        request.region = region

        requests <- *request        
    }

    // Close requests channel and wait for responses
    close(requests)
    wg.Wait()
    close(responses)

    cnt := 1
    // Process the results as they come in
    for response := range responses {

        fmt.Printf("Bucket: (%s) complete!  Buckets remaining: %d\n", response.bucket, len(buckets)-cnt)

        // Did the bucket request have errors?
        if response.err != nil {
            fmt.Println(response.err)
            continue
        }

        cnt  += 1
        size += response.size
    }

    fmt.Println(size)
    return 
}


1 Ответ

2 голосов
/ 28 мая 2020

Извините, у меня не было возможности полностью просмотреть это, но я отвечу: решения не кажутся эквивалентными с точки зрения параллелизма . Выскакивают 3 вещи:

  • Потоковая безопасность клиента boto s3. Этот поток безопасен? Вы можете это подтвердить? В этой статье Reddit предполагается, что он не потокобезопасен.
  • Python использует размер рабочего пула 50, но go не ограничен. ( можно использовать семфор , чтобы добавить верхнюю границу 50 к текущему коду)
  • Я не очень хорошо знаком с boto, но похоже, что go выполняет дополнительный ввод-вывод вызывать основной поток для каждого сегмента (GetBucketLocation) по сравнению с python.

Мои следующие вопросы:

  • Правильно ли каждое решение и можно ли вы доказываете это? (суммируются ли оба байта с одинаковым количеством байтов и соответствуют ли они консоли s3?)
  • Вы уверены, что параллельная структура такая же, т.е. суммирование по основному потоку, одинаковые размеры пула, одинаковое количество операций ввода-вывода на одного рабочего.
  • Эквивалентны ли значения по умолчанию для клиентов? Т.е. имеет ли python размер пула соединений по умолчанию? Go нет, поэтому он будет создавать соединения для каждого запрос (я только что столкнулся с этим на прошлой неделе)
...