Я выполняю код для создания стека облачной информации, удаления стека, обновления стека с определенной переменной параметра, используя файл json, используя python.
Я определил 3 различных оператора в функции init (). Мне нужно поставить условие if: если я хочу создать стек, он вызовет «create_products», обновит стек, вызовет «update_products» elif, удалит стек, обновит «delete_products»
from __future__ import print_function
import subprocess
import json
import yaml
import sys
import os
import re
import boto3
from glob import glob
def createstack(productName, productId, paramlist):
try:
client = boto3.client('servicecatalog', region_name='us-east-1')
ProvisioningArtifactId = client.list_provisioning_artifacts(ProductId=productId)
ArtifactId = ProvisioningArtifactId['ProvisioningArtifactDetails'][0]['Id']
response = client.provision_product(ProvisionedProductName=productName, ProductId=productId, ProvisioningArtifactId=ArtifactId, ProvisioningParameters=paramlist)
print(response)
except Exception as e:
error = "An error occurred processing this request: " + str(e)
print(error)
def updatestack(productName, productId, paramlist):
try:
client = boto3.client('servicecatalog', region_name='us-east-1')
ProvisioningArtifactId = client.list_provisioning_artifacts(ProductId=productId)
ArtifactId = ProvisioningArtifactId['ProvisioningArtifactDetails'][0]['Id']
response = client.update_provisioned_product(ProvisionedProductName=productName, ProductId=productId, ProvisioningArtifactId=ArtifactId, ProvisioningParameters=paramlist)
print(response)
except Exception as e:
error = "An error occurred processing this request: " + str(e)
print(error)
def deletestack(productName):
try:
client = boto3.client('servicecatalog', region_name='us-east-1')
response = client.terminate_provisioned_product(ProvisionedProductName=productName)
print(response)
return response
except Exception as e:
error = "An error occurred processing this request: " + str(e)
return(error)
def init():
#global args
with open("list_provisional_product.json") as f:
product_list = json.load(f)
with open("testing-pipeline-params.json") as f:
baselist = json.load(f)
for product in product_list["update_products"]:
for provisioned_product_name in product["provisioned_product_names"]:
updatestack(productName=provisioned_product_name, productId=product["product_id"], paramlist=baselist[provisioned_product_name])
for product in product_list["delete_products"]:
for provisioned_product_name in product["provisioned_product_names"]:
deletestack(productName=provisioned_product_name)
for product in product_list["create_products"]:
for provisioned_product_name in product["provisioned_product_names"]:
createstack(productName=provisioned_product_name, productId=product["product_id"], paramlist=baselist[provisioned_product_name])
def main():
init()
if __name__== "__main__":
main()