-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathoci-python-script-threaded-template.py
More file actions
167 lines (135 loc) · 6.57 KB
/
oci-python-script-threaded-template.py
File metadata and controls
167 lines (135 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# OCI Python Script template
# Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
# This script provides ....<basic documenation>.....
# Usage: python oci-python-xxx-yyy.py
# Only import required code from OCI
from oci import config
from oci.exceptions import ClientError,ServiceError
from oci.auth.signers import InstancePrincipalsSecurityTokenSigner
from oci import retry
# OCI Clients and models (import as necessary)
from oci.database import DatabaseClient
from oci.resource_search import ResourceSearchClient
from oci.resource_search.models import StructuredSearchDetails
# Additional imports
import argparse # Argument Parsing
import logging # Python Logging
from concurrent.futures import ThreadPoolExecutor, Future
from concurrent import futures
global total
total = 0
# Callback
def finish(future: Future):
logger.debug(f"Future: {future.result()}")
global total
total += future.result()
pass
# Threaded function
def work_function_(ocid: str):
# ADB Example - allow exceptions
# try:
database = database_client.get_autonomous_database(
autonomous_database_id=f"{ocid}"
).data
logger.info(f"DB Name: {database.display_name}")
logger.info(f"DB OCID: {ocid}")
# logger.debug(f"Full Details: {database}")
# except ServiceError as ex:
# logger.error(f"Failed to call OCI. Target Service/Operation: {ex.target_service}/{ex.operation_name} Code: {ex.code}")
# logger.debug(f"Full Exception Detail: {ex}")
return database.display_name
# Only if called in Main
if __name__ == "__main__":
# PHASE 1 - Parsing of Arguments, Python Logging
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", help="Increased Verbosity, boolean", action="store_true")
parser.add_argument("-pr", "--profile", help="Named Config Profile, from OCI Config", default="DEFAULT")
parser.add_argument("-ip", "--instanceprincipal", help="Use Instance Principal Auth - negates --profile", action="store_true")
parser.add_argument("-r", "--region", help="Use Instance Principal with alt region")
parser.add_argument("-t", "--threads", help="Concurrent Threads (def=5)", type=int, default=5)
args = parser.parse_args()
verbose = args.verbose # Boolean
profile = args.profile # String
use_instance_principals = args.instanceprincipal # Attempt to use instance principals (OCI VM)
region = args.region # Region to use with Instance Principal, if not default
threads = args.threads
# Logging Setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(threadName)s] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
if verbose:
logger.setLevel(logging.DEBUG)
logger.info(f'Using profile {profile} with Logging level {"DEBUG" if verbose else "INFO"}')
# PHASE 2 - Creation of OCI Client(s)
try:
# Client creation
if use_instance_principals:
logger.info(f"Using Instance Principal Authentication")
signer = InstancePrincipalsSecurityTokenSigner()
config_ip = {}
if region:
config_ip={"region": region}
logger.info(f"Changing region to {region}")
# Example of client
database_client = DatabaseClient(config=config_ip, signer=signer, retry_strategy=retry.DEFAULT_RETRY_STRATEGY)
search_client = ResourceSearchClient(config=config_ip, signer=signer)
# Connect to OCI with DEFAULT or defined profile
else:
# Use a profile (must be defined)
logger.info(f"Using Profile Authentication: {profile}")
config = config.from_file(profile_name=profile)
if region:
config["region"] = region
logger.info(f"Changing region to {region}")
# Create the OCI Client to use
database_client = DatabaseClient(config, retry_strategy=retry.DEFAULT_RETRY_STRATEGY)
search_client = ResourceSearchClient(config)
except ClientError as ex:
logger.critical(f"Failed to connect to OCI: {ex}")
# Create any necessary Clients
db_client = DatabaseClient(config)
# PHASE 3 - Main Script Execution (threaded)
# 2 examples for getting a list for threading
# 1) Resource Search, create list of OCIDs
# Get Resource List via Search
atp_db = search_client.search_resources(
search_details=StructuredSearchDetails(
type = "Structured",
query='query autonomousdatabase resources'
),
limit=1000
).data
# Build a list of OCIDs to operate on
db_ocids = []
for i,db_it in enumerate(atp_db.items, start=1):
db_ocids.append(db_it.identifier)
# 2) Use pagination and list_call_get_all_results, then pass actual objects as work items
# Get all compartments (we don't know the depth of any), tenancy level
# Using the paging API
# paginated_response = pagination.list_call_get_all_results(
# identity_client.list_compartments,
# tenancy_ocid,
# access_level="ACCESSIBLE",
# sort_order="ASC",
# compartment_id_in_subtree=True,
# lifecycle_state="ACTIVE",
# limit=1000)
# comp_list.extend(paginated_response.data)
# Thread Pool with execution based on incoming list of OCIDs
with ThreadPoolExecutor(max_workers = threads, thread_name_prefix="thread") as executor:
results = [executor.submit(work_function, ocid) for ocid in db_ocids]
logger.info(f"Kicked off {threads} threads for parallel execution - adjust as necessary")
# Thread Pool with execution based on incoming list of Compartments
# with ThreadPoolExecutor(max_workers = threads, thread_name_prefix="thread") as executor:
# results = [executor.submit(work_function, c) for c in comp_list]
# logger.info(f"Kicked off {threads} threads for parallel execution - adjust as necessary")
# # Add callbacks to report
# for future in results:
# # future.add_done_callback(print)
# future.add_done_callback(finish)
for future in futures.as_completed(results):
try:
logger.info(f"Result: {future.result()}")
except ServiceError as ex:
logger.error(f"ERROR: {ex.message}")
logger.info(f"Finished submitting all for parallel execution")