вот где я приземлился, примите это с пониманием, что coginito имеет некоторые ограничения, которые затрудняют истинное безошибочное ротацию пароля. Также знайте, что если вы можете сделать скрипт более эффективным, вам следует это сделать, потому что в лямбде вы, вероятно, используете более чем 350 пользователей из-за 5RPS в API администратора.
Предварительные условия: установите для функции lambda значение 5 параллелизма или Вы превысите лимит 5RPS. 1 изменяемое поле в ваших атрибутах cognito userpool для вставки даты. Пользовательский лямбда-файл zip, включающий pandas, сохраненный в s3.
import os
import sys
# this adds the parent directory of bin so we can find the module
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
sys.path.append(parent_dir)
#This addes venv lib/python2.7/site-packages/ to the search path
mod_path = os.path.abspath(parent_dir+"/lib/python"+str(sys.version_info[0])+"."+str(sys.version_info[1])+"/site-packages/")
sys.path.append(mod_path)
import boto3
import datetime
import pandas as pd
import time
current_path = os.path.dirname(os.path.realpath(__file__))
# Use this one for the parent directory
ENV_ROOT = os.path.abspath(os.path.join(current_path, os.path.pardir))
# Use this one for the current directory
#ENV_ROOT = os.path.abspath(os.path.join(current_path))
sys.path.append(ENV_ROOT)
#if __name__ == "__main__":
def lambda_handler(event, context):
user_pool_id = os.environ['USER_POOL_ID']
idp_client = boto3.client('cognito-idp')
users_list = []
page_token = None
dateToday = datetime.datetime.today().date()
def update_user(user) :
idp_client.admin_update_user_attributes(
UserPoolId = user_pool_id,
Username = user,
UserStatus = 'RESET_REQUIRED',
UserAttributes = [
{
'Name': 'custom:pwdCreateDate',
'Value': str(dateToday)
}
]
)
users = idp_client.list_users(
UserPoolId = user_pool_id
)
for user in users['Users']: users_list.append(user['Username'])
page_token = users['PaginationToken']
while 'PaginationToken' in users :
users = idp_client.list_users(
UserPoolId = user_pool_id,
PaginationToken = page_token
)
for user in users["Users"]: users_list.append(user["Username"])
if 'PaginationToken' in users :
page_token = users['PaginationToken']
attrPwdDates = []
for i in range(len(users_list)) :
userAttributes = idp_client.admin_get_user(
UserPoolId = user_pool_id,
Username = users_list[i]
)
for a in userAttributes['UserAttributes'] :
if a['Name'] == 'custom:pwdCreateDate' :
attrPwdDates.append(datetime.datetime.strptime(a['Value'], '%Y-%m-%d %H:%M:%S.%f').date())
time.sleep(1.0)
list_of_userattr_tuples = list(zip(users_list, attrPwdDates))
df1 = pd.DataFrame(list_of_userattr_tuples,columns = ['Username','Password_Last_Set'])
authPwdDates = []
for i in range(len(users_list)) :
authEvents = idp_client.admin_list_user_auth_events(
UserPoolId = user_pool_id,
Username = users_list[i]
)
for event in authEvents['AuthEvents'] :
if event['EventType'] == 'ForgotPassword' and event['EventResponse'] == 'Pass' :
authPwdDates.append(event['CreationDate'].date())
break
time.sleep(1.0)
list_of_userauth_tuples = list(zip(users_list, authPwdDates))
df2 = pd.DataFrame(list_of_userauth_tuples,columns = ['Username','Password_Last_Forgot'])
df3 = df1.merge(df2,how='left', on = 'Username')
df3[['Password_Last_Set','Password_Last_Forgot']] = df3[['Password_Last_Set','Password_Last_Forgot']].apply(pd.to_datetime)
cols = ['Password_Last_Set','Password_Last_Forgot']
df4 = df3.loc[df3[cols].max(axis=1)<=pd.Timestamp.now() - pd.Timedelta(90, unit='d'), 'Username']
for i,r in df4.iterrows() :
update_user(r['Username'])