Python ничего не делать, если пустой JSON объект - PullRequest
0 голосов
/ 04 февраля 2020

Я python нуб, ищу советы по передовому опыту и решение этой проблемы. Я могу успешно извлечь необходимые данные из API Okta, что приводит к выводу "PRECONDITION FAILED. Ошибки, сообщенные удаленным сервером: не удалось обновить. Ресурс изменен на сервере." это только половина того, что я хочу. Я хотел бы иметь возможность проверять этот вывод каждые 60 минут, и если он не существует, ничего не делать. Я думал, что это то, что я делал в коде ниже в функции «start», но, похоже, он не ведет себя так, как я хочу. Как сохранить json объекты в переменных событиях или выйти из приложения, если объект не отображается? Заранее спасибо!

import requests
import os
import json
import time
from datetime import datetime, timedelta

key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
app_id = 'target.id eq "*******"'
all_params = f'{event_type} and {app_id} and {outcome}'
api_url = f'https://domain.okta.com/api/v1/logs'

last_hour_date_time = datetime.utcnow() - timedelta(minutes=60)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))

def auth_okta():
    url = api_url.format()
    print(url)
    params = {
        'filter': all_params,
        'since': since
    }
    response = requests.get(url, params=params,
                        headers={'Accept': 'application/json', 
                        'authorization': key})
    response_json = response.json()
    return response_json

def start():
    for event_data in auth_okta():
        events = event_data['outcome']['reason']
        if not events:
            print('nothing there')
        else:
            print(events)

start()

Ответы [ 3 ]

1 голос
/ 04 февраля 2020

Вы можете использовать sys.exit (), чтобы завершить все выполнение.

if events == None:
    sys.exit(0)
0 голосов
/ 06 февраля 2020

кажется, что моя первоначальная структура кода создавала слишком много списков, и я закончил реструктуризацию всего скрипта Я также взял несколько советов от Грега.

import requests
import sys
import os
import json
import time
from datetime import datetime, timedelta

key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
target_type = 'target.type eq "User"'
app_id = 'target.id eq "APP-ID-HERE"'
all_params = f'{event_type} and {target_type} and {app_id} and {outcome}'
api_url = f'https://DOMAIN.okta.com/api/v1/logs'
slack_url = "SLACK WEBHOOK URL"
last_hour_date_time = datetime.utcnow() - timedelta(days=1)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
unique_set=[]


class Events:
    def okta_auth(self):
        event_list=[]
        url = api_url.format()
        params = {
            'filter': all_params,
            'since': since
        }
        response = requests.get(url, params=params,
                            headers={'Accept': 'application/json', 
                            'authorization': key})
        response_json = response.json()
        for event_data in response_json:
            events = event_data['outcome']['reason']
            targets = event_data['target']
            parse = list(map(lambda x: x['alternateId'], targets))
            target_list=[]
            event_list.append(events)
            target_list.append(parse[1])
            for item in target_list:
                if item not in unique_set:
                    unique_set.append(item)
        if event_list != []:
            self.post_slack()
        else:
            sys.exit(0)

    def post_slack(self):
        url = slack_url.format()
        payload =  "{\"text\": \" Twilio Flex provisioing failure. Please check the following users: \n %s \"}" % '\n'.join(unique_set)        
        requests.post(url, headers={'Accept': 'application/json'}, data=payload)

if __name__ == "__main__":
    Events().okta_auth()

0 голосов
/ 04 февраля 2020

Попробуйте использовать команду .get для своих event_data:

for event_data in auth_okta():
    events = None
    while events is None:
        events = event_data.get("outcome", None).get("reason", None)
        print("Nothing found... waiting")
        time.sleep(3600) #wait one hour
    print(events)
...