Skip to content
Open
4 changes: 2 additions & 2 deletions Drivers/Built_In_Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from Framework.Built_In_Automation.Sequential_Actions import sequential_actions as sa


def sequential_actions(
async def sequential_actions(
step_data,
test_action_info,
temp_q,
debug_actions=None,
):
try:
sTestStepReturnStatus = sa.Sequential_Actions(
sTestStepReturnStatus = await sa.Sequential_Actions(
step_data,
test_action_info,
debug_actions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def if_else_log_for_actions(left, next_level_step_data, statement="if"):
return left + ".... condition matched\n" + "Running actions: " + log_actions


def If_else_action(step_data, data_set_no):
async def If_else_action(step_data, data_set_no):
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
try:
data_set = step_data[data_set_no]
Expand Down Expand Up @@ -526,7 +526,7 @@ def check_operators():
)
return "zeuz_failed"
if data_set_index not in inner_skip:
result, skip = Run_Sequential_Actions(
result, skip = await Run_Sequential_Actions(
[data_set_index]
) # Running
inner_skip = list(set(inner_skip+skip))
Expand Down Expand Up @@ -559,7 +559,7 @@ def sanitize_deprecated_dataset(value):
return value


def for_loop_action(step_data, data_set_no):
async def for_loop_action(step_data, data_set_no):
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
try:
data_set = step_data[data_set_no]
Expand Down Expand Up @@ -748,7 +748,7 @@ def for_loop_action(step_data, data_set_no):
sr.Set_Shared_Variables(CommonUtil.dont_prettify_on_server[0], step_data, protected=True, pretty=False)
sr.test_action_info = CommonUtil.all_action_info[step_index]
return "zeuz_failed", outer_skip
result, skip = Run_Sequential_Actions([data_set_index])
result, skip = await Run_Sequential_Actions([data_set_index])
inner_skip = list(set(inner_skip + skip))
outer_skip = list(set(outer_skip + inner_skip))

Expand Down Expand Up @@ -848,7 +848,7 @@ def for_loop_action(step_data, data_set_no):
return CommonUtil.Exception_Handler(sys.exc_info()), []


def While_Loop_Action(step_data, data_set_no):
async def While_Loop_Action(step_data, data_set_no):
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
try:
data_set = step_data[data_set_no]
Expand Down Expand Up @@ -947,7 +947,7 @@ def While_Loop_Action(step_data, data_set_no):
3
)
return "zeuz_failed", outer_skip
result, skip = Run_Sequential_Actions(
result, skip = await Run_Sequential_Actions(
[data_set_index]
) # new edit: full step data is passed. [step_data[data_set_index]])
# Recursively call this function until all called data sets are complete
Expand Down Expand Up @@ -1049,7 +1049,7 @@ def ticker_linear_shape(seconds, callable, *args, **kwargs):
seconds -= 1


def Sequential_Actions(
async def Sequential_Actions(
step_data,
test_action_info,
debug_actions=None,
Expand All @@ -1068,13 +1068,13 @@ def Sequential_Actions(
# sr.Set_Shared_Variables("test_action_info", test_action_info, protected=True, print_variable=False)
sr.test_action_info = test_action_info

result, skip_for_loop = Run_Sequential_Actions([], debug_actions)
result, skip_for_loop = await Run_Sequential_Actions([], debug_actions)
# empty list means run all, instead of step data we want to send the dataset no's of the step data to run
write_browser_logs()
return result


def Run_Sequential_Actions(
async def Run_Sequential_Actions(
data_set_list=None, debug_actions=None
): # data_set_no will used in recursive conditional action call
if data_set_list is None:
Expand All @@ -1100,7 +1100,7 @@ def Run_Sequential_Actions(
data_set_list.append(i)

if len(data_set_list) == 0 and CommonUtil.debug_status and not sr.Test_Shared_Variables("selenium_driver") and ConfigModule.get_config_value("Inspector", "ai_plugin").strip().lower() in CommonUtil.affirmative_words:
return Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), []
return await Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), []

for dataset_cnt in data_set_list: # For each data set within step data
data_set = step_data[dataset_cnt] # Save data set to variable
Expand Down Expand Up @@ -1196,15 +1196,15 @@ def Run_Sequential_Actions(

# If middle column = action, call action handler, but always return a pass
elif "optional action" in action_name:
result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler
result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler
if result == "zeuz_failed":
CommonUtil.ExecLog(sModuleInfo, "Optional action failed. Returning pass anyway", 2)
result = "passed"

# If middle column = conditional action, evaluate data set
elif "conditional action" in action_name or "if else" in action_name:
if action_name.lower().strip() == "windows conditional action":
result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt)
result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt)
skip += to_skip
skip_for_loop += to_skip
if result in failed_tag_list:
Expand All @@ -1215,7 +1215,7 @@ def Run_Sequential_Actions(

elif action_name.lower().strip() != "conditional action" and action_name.lower().strip() != "if else":
# old style conditional action
result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt)
result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt)
skip += to_skip
skip_for_loop += to_skip
if result in failed_tag_list:
Expand All @@ -1224,7 +1224,7 @@ def Run_Sequential_Actions(
break

else:
result, to_skip = If_else_action(step_data, dataset_cnt)
result, to_skip = await If_else_action(step_data, dataset_cnt)
skip += to_skip
skip_for_loop += to_skip
if result in failed_tag_list:
Expand All @@ -1235,15 +1235,15 @@ def Run_Sequential_Actions(
# Simulate a while/for loop with the specified data sets
elif "loop action" in action_name:
if action_name.lower().strip() == "for loop action":
result, skip_for_loop = for_loop_action(step_data, dataset_cnt)
result, skip_for_loop = await for_loop_action(step_data, dataset_cnt)
skip = list(set(skip + skip_for_loop))
if result in failed_tag_list:
return "zeuz_failed", skip_for_loop
break
elif action_name.lower().strip() not in ("while loop action", "for loop action"):
# old style loop action
# CommonUtil.ExecLog(sModuleInfo,"Old style loop action found. This will not be supported in 2020, please replace them with new loop actions",2)
result, skip_for_loop = Loop_Action_Handler(data_set, row, dataset_cnt)
result, skip_for_loop = await Loop_Action_Handler(data_set, row, dataset_cnt)
skip = skip_for_loop

position_of_loop_action = dataset_cnt
Expand Down Expand Up @@ -1273,7 +1273,7 @@ def Run_Sequential_Actions(
return "zeuz_failed", skip_for_loop
elif "loop" in action_name:
if "while" in action_name.lower():
result, skip_for_loop = While_Loop_Action(step_data, dataset_cnt)
result, skip_for_loop = await While_Loop_Action(step_data, dataset_cnt)
skip = list(set(skip + skip_for_loop))
if result in failed_tag_list:
return "zeuz_failed", skip_for_loop
Expand Down Expand Up @@ -1343,7 +1343,7 @@ def Run_Sequential_Actions(

# If middle column = action, call action handler
elif "action" in action_name: # Must be last, since it's a single word that also exists in other action types
result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler
result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler
if row[0].lower().strip() in ("step exit", "testcase exit"):
global step_exit_fail_called, step_exit_pass_called
CommonUtil.ExecLog(sModuleInfo, f"{row[0].lower().strip()} Exit called. Stopping Test Step.", 1)
Expand Down Expand Up @@ -1373,12 +1373,12 @@ def Run_Sequential_Actions(
continue

CommonUtil.ExecLog(sModuleInfo, "Action failed. Trying bypass #%d" % (i + 1), 1)
result = Action_Handler(bypass_data_set[i], bypass_row[i])
result = await Action_Handler(bypass_data_set[i], bypass_row[i])
if result in failed_tag_list: # This also failed, so chances are first failure was real
continue # Try the next bypass, if any
else: # Bypass passed, which indicates there was something blocking the element in the first place
CommonUtil.ExecLog(sModuleInfo, "Bypass passed. Retrying original action", 1)
result = Action_Handler(data_set, row) # Retry failed original data set
result = await Action_Handler(data_set, row) # Retry failed original data set
if result in failed_tag_list: # Still a failure, give up
return "zeuz_failed", skip_for_loop
break # No need to process more bypasses
Expand All @@ -1404,7 +1404,7 @@ def Run_Sequential_Actions(
return CommonUtil.Exception_Handler(sys.exc_info())


def Loop_Action_Handler(data, row, dataset_cnt):
async def Loop_Action_Handler(data, row, dataset_cnt):
""" Performs a sub-set of the data set in a loop, similar to a for or while loop """

sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
Expand Down Expand Up @@ -1903,7 +1903,7 @@ def build_subset(new_step_data):
return CommonUtil.Exception_Handler(sys.exc_info())


def Conditional_Action_Handler(step_data, dataset_cnt):
async def Conditional_Action_Handler(step_data, dataset_cnt):
""" Process conditional actions, called only by Sequential_Actions() """
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME

Expand Down Expand Up @@ -1984,6 +1984,37 @@ def Conditional_Action_Handler(step_data, dataset_cnt):
logic_decision = False
log_msg += "Element is not found\n"

elif module == "playwright":
try:
from Framework.Built_In_Automation.Web.Playwright import locator as PlaywrightLocator
from Framework.Built_In_Automation.Web.Playwright.BuiltInFunctions import current_page

wait = 10
for left, mid, right in data_set:
mid = mid.lower()
left = left.lower()
if "optional parameter" in mid and "wait" in left:
wait = float(right.strip())

if current_page is None:
CommonUtil.ExecLog(sModuleInfo, "No browser open for Playwright conditional action", 3)
logic_decision = False
log_msg += "Browser not open\n"
else:
Element = await PlaywrightLocator.Get_Element(data_set, current_page, element_wait=wait)
if Element == "zeuz_failed":
CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3)
logic_decision = False
log_msg += "Element is not found\n"
else:
logic_decision = True
log_msg += "Element is found\n"

except: # Element doesn't exist, proceed with the step data following the fail/false path
CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3)
logic_decision = False
log_msg += "Element is not found\n"

elif module == "windows":
try:
from Framework.Built_In_Automation.Desktop.Windows import BuiltInFunctions
Expand Down Expand Up @@ -2186,7 +2217,7 @@ def Conditional_Action_Handler(step_data, dataset_cnt):
2
)
if data_set_index not in inner_skip:
result, skip = Run_Sequential_Actions(
result, skip = await Run_Sequential_Actions(
[data_set_index]
) # Running
inner_skip = list(set(inner_skip + skip))
Expand Down Expand Up @@ -2296,7 +2327,7 @@ def compare_variable_names(set, dataset):
CommonUtil.compare_action_varnames = {"left": "Left", "right": "Right"}


def Action_Handler(_data_set, action_row, _bypass_bug=True):
async def Action_Handler(_data_set, action_row, _bypass_bug=True):
""" Finds the appropriate function for the requested action in the step data and executes it """

sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
Expand Down Expand Up @@ -2417,6 +2448,8 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True):
elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]:
time.sleep(CommonUtil.global_sleep[module]["_all_"]["pre"])
result = run_function(data_set) # Execute function, providing all rows in the data set
if inspect.iscoroutine(result):
result = await result
if post_sleep:
time.sleep(post_sleep)
elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]:
Expand Down
Loading
Loading