Python Источник имеет около 19 тыс. Записей, которые нужно извлекать отдельно, код начал сбой - PullRequest
0 голосов
/ 15 апреля 2020

Эй, изначально, когда я начал этот проект, в нем было всего около 1000 записей, которые ему нужно было по отдельности назвать и определить статус 200. (Амазон AWS Бото). Код работал, хотя и медленно. Я ввел многопоточность, чтобы сделать его go быстрее.

В источнике теперь около 19 тыс. Записей, на все записи требуется около часа, чтобы go. Мой менеджер обнаружил, что один из вызванных URL-адресов возвращался как 200 при запуске в скрипте, но возвращается как 403. При индивидуальном запуске.

Я подозреваю, что может быть ситуация тайм-аута, и мой код просто сдаться. Это не имеет смысла - но это не ловит очевидные ошибки, если бы эти записи делались по отдельности.

Я бы попробовал привести примеры моего кода, но, к сожалению, я даже не знаю, какую часть виновник.

help

обновление с кодом:

я думаю, что это как-то связано с тем, как я многопоточен, но я не совсем уверен, я пришел сюда, потому что я не даже не знаю, как именно я должен сформулировать мою проблему

analyzedArray = []
errorArray = []
errorArrayMessage = ''
errorSize = 0


def analyzedAssetsOnLoad(event, context):
    url = "urlSrc"
    response = urllib.request.urlopen(url)
    json_array = json.loads(response.read())
    entries = json_array["entries"]
    batchedEntries = batchEntries(entries, 1900)
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(analyzeArray, batchedEntries)

    output = io.StringIO()

    w = csv.DictWriter(
        output, ["guid", "url-0", "status-0", "url-1", "status-1", "url-2", "status-2", "url-0-2sec", "status-0-2sec", "url-1-2sec", "status-1-2sec"])
    w.writeheader()
    for entry in analyzedArray:
        entry = checkEntries(entry)
        if entry["entry-0"]["status"] == "403" or entry["entry-1"]["status"] == '403' or entry["entry-2"]["status"] == '403' or entry["entry-0-2sec"]["status"] == "403" or entry["entry-1-2sec"]["status"] == '403':
            errorArray.append(entry)
        row = {
            "guid": entry["guid"],
            "url-0": entry["entry-0"]["url"],
            "status-0": entry["entry-0"]["status"],
            "url-1": entry["entry-1"]["url"],
            "status-1": entry["entry-1"]["status"],
            "url-2": entry["entry-2"]["url"],
            "status-2": entry["entry-2"]["status"],
            "url-0-2sec": entry["entry-0-2sec"]["url"],
            "status-0-2sec": entry["entry-0-2sec"]["status"],
            "url-1-2sec": entry["entry-1-2sec"]["url"],
            "status-1-2sec": entry["entry-1-2sec"]["status"]
        }
        w.writerow(row)
    getErrors = generateErrorBody(errorArray)
    errors = {"message": f"{getErrors['numberOfErrors']} mismatches found",
              "errorList": f" {getErrors['errorMessage']}",
              "numberOfErrors": getErrors["numberOfErrors"]}
    sendEmail(output.getvalue(), errors),





def batchEntries(allEntries, maxBatchSize):
    batched = []
    for i in range(0, len(allEntries), maxBatchSize):
        batched.append(allEntries[i:i + maxBatchSize])

    return batched


def getStatus(value, bucket):
    if value == "null":
        return
    client = boto3.client("s3")
    response = {}
    try:
        response = client.head_object(Bucket=bucket, Key=value)
    except ClientError as e:
        if e.response['Error']['Code'] == '403':
            response = {"ResponseMetadata": {"HTTPStatusCode": '403'}}

    return response


def attemptUrl(url):
    try:
        return request.urlopen(url)
    except HTTPError as e:
        return e


def analyzeArray(items):
    for item in items:
        myDict = {}
        myDict["guid"] = item["guid"]
        if "shortform" in item["guid"]:
            pass
        # checkValue(item)
        count = 0
        for index, content in enumerate(item["content"]):
            myContent = {}
            myContent["index"] = index
            if "https://${US_VOD_PROD}/" in content['url']:
                url = content["url"].replace("https://${US_VOD_PROD}/", "")
                bucket = "bucket1"
            if "https://${US_SF_PROD}/" in content['url']:
                url = content["url"].replace("https://${US_SF_PROD}/", "")
                bucket = "bucket2"
            if "none://use/externalContentUri/field" in content['url']:
                pass
            myContent["url"] = url
            resp = getStatus(url, bucket)
            myContent["status"] = resp["ResponseMetadata"]["HTTPStatusCode"]
            myDict["entry-%s" % index] = myContent
            if "/cmaf/mpeg_cenc/" in url or "/mpeg_6sec/" in url:
                additionalContent = {}
                additionalUrl = url.replace(
                    '/cmaf/mpeg_cenc/', "/cmaf/mpeg_cenc_2sec/")
                additionalUrl = url.replace(
                    '/mpeg_6sec/', "/mpeg_2sec/")
                additionalContent["url"] = additionalUrl
                additionalResp = getStatus(additionalUrl, bucket)
                additionalContent["status"] = additionalResp["ResponseMetadata"]["HTTPStatusCode"]
                myDict["entry-%s-2sec" % count] = additionalContent
                count = count+1
                print(myDict)

        analyzedArray.append(myDict)


def checkEntries(entry):
    object = entry
    if entry.get("entry-0") == None:
        object["entry-0"] = {"index": 0, "url": "null", "status": "0"}
    if entry.get("entry-1") == None:
        object["entry-1"] = {"index": 1, "url": "null", "status": "0"}
    if entry.get("entry-2") == None:
        object["entry-2"] = {"index": 2, "url": "null", "status": "0"}
    if entry.get("entry-0-2sec") == None:
        object["entry-0-2sec"] = {"index": 3, "url": "null", "status": "0"}
    if entry.get("entry-1-2sec") == None:
        object["entry-1-2sec"] = {"index": 4, "url": "null", "status": "0"}
    return object


def generateErrorBody(errors):
    errorMessage = ""
    flattenedErrors = []
    numberOfErrors = 0
    errorGUID = []
    for errorObject in errors:
        flattenedErrors.append(
            {"guid": errorObject["guid"], "url": errorObject["entry-0"]["url"], "status": errorObject["entry-0"]['status']})
        flattenedErrors.append(
            {"guid": errorObject["guid"], "url": errorObject["entry-1"]["url"], "status": errorObject["entry-1"]['status']})
        flattenedErrors.append(
            {"guid": errorObject["guid"], "url": errorObject["entry-2"]["url"], "status": errorObject["entry-2"]['status']})
        flattenedErrors.append(
            {"guid": errorObject["guid"], "url": errorObject["entry-0-2sec"]["url"], "status": errorObject["entry-0-2sec"]['status']})
        flattenedErrors.append(
            {"guid": errorObject["guid"], "url": errorObject["entry-1-2sec"]["url"], "status": errorObject["entry-1-2sec"]['status']})
    for error in flattenedErrors:
        if error["status"] == "0":
            continue
        else:
            errorGUID.append(error['guid'])
    errorGUID = list(dict.fromkeys(errorGUID))
    for guid in errorGUID:
        errorMessage += 'GUID: %s \n' % guid
    numberOfErrors = len(errorGUID)
    return {'errorMessage': errorMessage, 'numberOfErrors': numberOfErrors}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...