The following examples include a base script for creating Python-based synthetic transaction scripts on Selenium and two example scripts that manipulate webpage content.
Basic Selenium Python Structure
The following code snippet shows a basic Python script that imports the packages needed for the Selenium-Python environment, identifies where to insert your code, and has a main section.
# Read the following instructions on how to write a Python file to complete the required transaction and generate metrics from that
# Necessary packages are needed to be imported as per the user requirement
# Need to add options/preferences/capabilities according to their usage and as per their desired browser
# User needs to create a Python script as a process where in user needs to write their Python script as per their transaction
# User needs to surround their every step of action with try and catch and in catch they need to update the warnings dictionary for the purpose of catching issues while performing transaction
# If user gets any exception in middle of the transaction, the user needs to clean up the chrome and python processes and the processes spawned by the user in the transaction as well.
# The output JSON file must be in a specified format and an example is cited below:
# {
# "responseTime" : "5560.868979", ------- indicates the time taken in milliseconds to complete the overall transaction
# "warningInfo" : "", ----- to know when any warnings or exceptions occurred in the flow
# "errorInfo" : "", ----- to know when we have any critical exceptions occurred in the flow
# "screenshotOnError" : "True", ----- flag to indicate if screenshot was taken on error
# "uuidString" : "8507ba11-260b-41eb-936e-4f54676a7222", ----- unique identifier for the screenshot file
# "md5hashString" : "f5fe9a5f1f0f3ce5efd752fe4ecdc8e4", ----- MD5 hash checksum of the screenshot for integrity verification
# "screenshotTS" : "1749726470000", ----- timestamp when screenshot was captured (in milliseconds)
# "granularSteps" : [ ---- needs to be added when user needs to calculate response times for individual actions
# {
# "stepName": "", ----- to show the metric in UI, if this is blank then this metric value is not shown in the UI
# "uiLocatorString": "https://www.mysubdomain.com/",----- to know the locator of that webelement
# "timeTaken": "5143.960953", ----- time taken in milliseconds to do the action from the scratch
# "actionPerfomed": "openURL", ----- action performed in the transaction
# "actionAvailabilityStatus": "1", ----- indicates if the action was successfully available/performed (1=success, 0=failure)
# "actionThresholdBreached": "0" ----- indicates if performance threshold was breached (1=breached, 0=within limits)
# },
# {
# "stepName": "clickUser",
# "uiLocatorString": "css=#users-container > h1",
# "timeTaken": "5408.558846",
# "actionPerfomed": "click",
# "actionAvailabilityStatus": "1",
# "actionThresholdBreached": "0"
# },
# {
# "stepName": "editText",
# "uiLocatorString": " xpath=//*[@id=snow-container]/div[2]/div[1] edit content",
# "timeTaken": "5453.449011",
# "actionPerfomed": "editContent",
# "actionAvailabilityStatus": "0",
# "actionThresholdBreached": ""
# }
# ]
# }
# These are the default imports user must include
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import sys
import json
import os
import signal
import logging as logger
import datetime
import multiprocessing
from multiprocessing import Manager
import time
import re as regex
import uuid
import hashlib
import argparse
# Global variables for enhanced functionality
proxy='' # proxy server configuration for network routing
exceptionTime=0 # cumulative time spent handling exceptions during transaction
stepTimeStart=0 # timestamp when current step begins execution
maxTimeoutInSeconds = 30 # maximum wait time for element operations
stepExceptionTimeoutInSeconds = 25 # timeout threshold for step-level exception handling
variableDict = {} # dictionary to store dynamic variables for templating
gloabalDriver = None # global reference to webdriver instance for error handling
# By default, we use chrome, but now with multi-browser support (Chrome, Firefox, Edge)
def setChromeOptionsAndCaps(isProxyEnabled):
chromeOptions = webdriver.ChromeOptions()
chromeOptions.binary_location ="/usr/bin/google-chrome" # specify the path of the browser binary
chromeOptions.add_argument('--no-sandbox') # disable sandboxing for compatibility
chromeOptions.add_argument('--start-maximized') # to start the browser window in full size
chromeOptions.add_argument('--ignore-certificate-errors') # to ignore SSL certificate errors
chromeOptions.add_argument('--disable-extensions') # to disable extensions in the browser
chromeOptions.add_argument('--headless') # to run the transaction in headless mode
chromeOptions.add_argument('--disable-gpu') # to disable graphical user interface
chromeOptions.add_argument('window-size=1200x600') # setting the window size with utmost values
chromeOptions.add_argument('--proxy-server=%s' % proxy) # configure proxy server if needed
chromeCapabilities=chromeOptions.to_capabilities()
chromeCapabilities['acceptSslCerts']=True # to accept all kind of ssl certificates
chromeCapabilities['acceptInsecureCerts']=True # to accept insecure certificates
#chromeCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return chromeCapabilities
def setFirefoxOptionsAndCaps(isProxyEnabled):
firefoxOptions=webdriver.FirefoxOptions()
firefoxOptions.binary_location = "/usr/bin/firefox" # specify the path of Firefox binary
firefoxOptions.add_argument('--no-sandbox') # disable sandboxing for compatibility
firefoxOptions.add_argument('--start-maximized') # to start the browser window in full size
firefoxOptions.add_argument('--ignore-certificate-errors') # to ignore SSL certificate errors
firefoxOptions.add_argument('--disable-extensions') # to disable extensions in the browser
firefoxOptions.add_argument('--headless') # to run the transaction in headless mode
firefoxOptions.add_argument('--disable-gpu') # to disable graphical user interface
firefoxOptions.add_argument('window-size=1200x600') # setting the window size with utmost values
firefoxOptions.add_argument('--proxy-server=%s' % proxy) # configure proxy server if needed
firefoxCapabilities=firefoxOptions.to_capabilities()
firefoxCapabilities['acceptSslCerts']=True # to accept all kind of ssl certificates
firefoxCapabilities['acceptInsecureCerts']=True # to accept insecure certificates
if isProxyEnabled:
firefoxCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return firefoxCapabilities
def setEdgeOptionsAndCaps(isProxyEnabled):
from msedge.selenium_tools import EdgeOptions
edgeOptions = EdgeOptions()
edgeOptions.use_chromium = True # use Chromium-based Edge
edgeOptions.add_argument('headless') # to run the transaction in headless mode
edgeOptions.add_argument('--disable-gpu') # to disable graphical user interface
edgeOptions.binary_location = "/usr/bin/microsoft-edge" # specify the path of Edge binary
edgeOptions.add_argument('--no-sandbox') # disable sandboxing for compatibility
edgeOptions.add_argument('--start-maximized') # to start the browser window in full size
edgeOptions.add_argument('--ignore-certificate-errors') # to ignore SSL certificate errors
edgeOptions.add_argument('--disable-extensions') # to disable extensions in the browser
edgeOptions.add_argument('window-size=1200x600') # setting the window size with utmost values
if isProxyEnabled:
edgeOptions.add_argument('--proxy-server=%s' % proxy) # configure proxy server if needed
edgeOptions.set_capability('platform', 'LINUX') # set platform capability
return edgeOptions
sys.tracebacklimit=0 # to limit the output stack traceback
# Fallback function that uses ActionChains when normal WebDriver actions fail
def actionChainsCheck(driver,element,action,value):
if action == "mouseOver":
ActionChains(driver).move_to_element(driver.find_element(*element)).perform()
elif action == "type" or action == "sendKeys":
ActionChains(driver).move_to_element(driver.find_element(*element)).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
ActionChains(driver).move_to_element(driver.find_element(*element)).click().perform()
# Multi-locator web action executor with timing, fallback strategies, and error handling
def actionLoop(driver,wait,elementList,action,stepName,rcaHeader,actionErrorMsg="",value=""):
stepTimeStart=time.time() # Start timing for this step
global exceptionTime
actionExceptionTime = 0
for i in range(len(elementList)):
try:
logger.info(str(datetime.datetime.now())+''' - webdriver performing action '''+action+''' on "'''+''.join(elementList[i]))
if action == "mouseOver":
# Perform mouse hover action on clickable element
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).perform()
elif action == "type" or action == "sendKeys":
# Clear field before typing (unless special keys), then send text
if not ("Keys." in value) and "type"==action:
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).clear()
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
# Perform click action on element when clickable
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).click()
# Return execution time in milliseconds (with/without exception time)
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as exp :
# Log failure and track exception timing for current locator
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(exp))
stepTimeStop=time.time()
actionExceptionTime = actionExceptionTime + stepTimeStop - stepTimeStart
exceptionTime= exceptionTime + stepTimeStop - stepTimeStart
if i < len(elementList)-1 :
stepTimeStart=time.time()
continue
else:
# All locators failed - attempt fallback recovery methods
localExceptionTime = stepTimeStop-stepTimeStart
stepTimeStart=time.time()
try:
# Choose fallback strategy based on exception duration
if localExceptionTime >= stepExceptionTimeoutInSeconds :
actionChainsCheck(driver,elementList[0],action,value)
else:
sleepLoop(driver,elementList[0],action,value)
# Return execution time after successful fallback
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as ex:
# Final failure - log error, capture screenshot, update metrics, raise exception
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(ex))
if not actionErrorMsg == "":
computeRCA(rcaHeader + actionErrorMsg,True)
else:
computeRCA(rcaHeader + action + " on "+ ''.join(elementList[i])+" cannot be performed ")
takeScreenshotonError(driver)
if not stepName == "":
metricObjects.update({stepName:metricsProcessed(action,elementList[0],0,stepName,"0","")})
raise Exception('''Exception:'''+ action +''' on '''+''.join(elementList[i])+''' cannot be performed''')
# Retry mechanism: attempts action with 1-second delays up to maxTimeoutInSeconds
def sleepLoop(driver,element,action,value=''):
for x in range(maxTimeoutInSeconds):
try:
time.sleep(1)
actionChainsCheck(driver,element,action,value)
break
except Exception as exp:
if x >= maxTimeoutInSeconds-1:
raise Exception(exp)
else:
continue
# to take individual metric as an object with certain attributes as listed
class metricsProcessed:
def __init__(self,actionPerformed,uiLocatorString,metricTimeTaken,stepName=None,actionAvailabilityStatus=None,thresholdBreached=None):
self.actionPerformed=actionPerformed
self.uiLocatorString=uiLocatorString
self.stepName=stepName
self.metricTimeTaken=metricTimeTaken
self.actionAvailabilityStatus=actionAvailabilityStatus
self.thresholdBreached=thresholdBreached
def regexCheckSelect(value,dropDownElement):
try:
value=value.replace("regexp:","");
pattern=regex.compile(value)
for dropDownOption in dropDownElement.options :
if(pattern.match(dropDownOption.text)):
dropDownElement.select_by_visible_text(dropDownOption.text)
break
except Exception as exp:
logger.error("Error Occured while Parsing Regex values "+str(exp))
# to check whether the specified metric dictionary is empty or not
def isJsonDictionaryEmpty(metricDictionary):
if json.dumps(metricDictionary)=="null":
logger.info("The metric Dictionary is null")
return "\"\""
else:
logger.info("Metric Dictionary is not null")
return str("\""+str(metricDictionary)+"\"")
def getValueFromVariableDict(key):
if key in variableDict.keys():
return variableDict[key]
else:
return "${"+key+"}"
# to compute metric values and update them in the output JSON file
def computeRCA(rcaMsg,actionErrorMsgCheck=False):
global gloabalDriver
if actionErrorMsgCheck:
import re
rcaMsg = re.sub(r'\${(\w+)}', lambda m: variableDict.get(m.group(1), m.group(0)), rcaMsg)
rcaMsg = rcaMsg[:500] # Limit message length to prevent excessive logs
# Add current URL context for better debugging
rcaMsg = rcaMsg + " ( Current URL - "+gloabalDriver.current_url+" )"
warnings.update({'''Exception''':rcaMsg})
def computeMetric(driver,uiLocatorString,actionPerfomed,actionTime,stepName,stepTime,windowStartTime,exceptionTime=0,isCompleted=False):
thresholdBreached = "0"
if postDeltaData == "False" or isCompleted:
timeTaken=round((actionTime-windowStartTime-exceptionTime)*1000,2)
else:
timeTaken = round((stepTime),2)
uiLocatorString=uiLocatorString.replace("\"","").replace("\\","")
# Check if performance threshold is breached
if thresholdsDict.get(stepName) is not None and timeTaken > thresholdsDict[stepName]:
thresholdBreached = "1"
if isCompleted:
responseTimeMetric.update({"responseTime":timeTaken}) # Final transaction time
else:
if not postDeltaData == "False":
# Accumulate step times for delta reporting
if stepName in metricObjects:
timeTaken += (metricObjects.get(stepName)).metricTimeTaken
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
else:
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
if not stepName=='':
takeScreenshotofEvent(driver,stepName) # Capture screenshot if event tracking enabled
# to create output metric json file with specified metrics and their attributes and overall response time/ warning info/critical info in the format as shown above
def formMetricJson():
filePointer=open(sys.argv[0]+'.json','w') # the output metric json file name must be the id.py.json because the platform uses the same format, user should not change this
filePointer.write("{\n") # the output metric json file must be in the specified format because the platform computes in this in this way do display the graphs in the UI
filePointer.write("\t\"responseTime\" : "+isJsonDictionaryEmpty(responseTimeMetric.get("responseTime")))
filePointer.write(",\n\t\"warningInfo\" : "+isJsonDictionaryEmpty(warnings.get("Exception")))
filePointer.write(",\n\t\"errorInfo\" : "+isJsonDictionaryEmpty(errors.get("Exception")))
filePointer.write(",\n\t\"screenshotOnError\" : \""+screenshotOnErrorFlag+"\"")
# Enhanced screenshot handling with UUID and hash verification
if screenshotOnErrorFlag=="True" and responseTimeMetric.get("responseTime")==None and not errors.get("Exception")=="connection timeout":
filePointer.write(",\n\t\"uuidString\" : \""+uuidString+"\"")
logger.info(str(datetime.datetime.now())+''' - Generating md5hash checksum for the png file''')
filePointer.write(",\n\t\"md5hashString\" : \""+hashlib.md5(open('/opt/opsramp/webprobe/data/'+uuidString+'.png','rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"screenshotTS\" : \""+str(int(time.time())*1000)+"\"")
# Previous success screenshot handling for comparison
if postPrevSuccessSSOnEvent=="True":
prevSuccessScreenshotFile=prevSuccessScreenshotIndex+"_"+str(warnings.get("Exception")).split("-")[0]+".png"
prevSuccessScreenshotFilePath='/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+"/"+prevSuccessScreenshotFile
if(os.path.isfile(prevSuccessScreenshotFilePath)):
filePointer.write(",\n\t\"prevSuccessScreenshotUUID\" : \""+prevSuccessScreenshotFile+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotHash\" : \""+hashlib.md5(open(prevSuccessScreenshotFilePath,'rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotTS\" : \""+str(int(os.path.getctime(prevSuccessScreenshotFilePath))*1000)+"\"")
# Conditional granular steps output based on availability monitoring
if sendAvailabilityDetails.value:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName,"actionAvailabilityStatus":obj.actionAvailabilityStatus,"actionThresholdBreached":obj.thresholdBreached} for obj in metricObjects.values()],indent=4))
else:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName} for obj in metricObjects.values()],indent=4))
filePointer.write("\n}")
filePointer.close()
#Captures screenshot when an error/exception occurs during transaction execution
def takeScreenshotonError(driver):
if screenshotOnErrorFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option opted''')
driver.save_screenshot("/opt/opsramp/webprobe/data/"+uuidString+".png")
else :
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option not opted''')
#
def takeScreenshotofEvent(driver,stepName):
if takeSSofEventFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Event option opted''')
logger.info(str(datetime.datetime.now())+''' Screenshot saved '''+'/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
driver.save_screenshot('/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
#Detects browser-level connection errors by looking for error codes in the page
def has_connection_error(driver):
try:
browser_error = driver.find_element(By.CLASS_NAME,"error-code")
if not browser_error.text:
return False
else:
return browser_error.text
except:
return False
#Checks if the page body is empty or has no content after loading, waits up to maxTimeoutInSeconds for content to appear
def has_page_return_empty(driver):
for i in range(maxTimeoutInSeconds):
try:
page_not_loaded= driver.find_element(By.XPATH,"//body")
if not page_not_loaded.text:
time.sleep(1)
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
continue
else:
return False
except:
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
time.sleep(1)
continue
return True
# where the whole transaction code goes in
def scriptCode():
global gloabalDriver
logger.info(str(datetime.datetime.now())+" - before webdriver invocation")
# Multi-browser initialization with enhanced capabilities
if browserName=="google-chrome":
driver=webdriver.Chrome(desired_capabilities=setChromeOptionsAndCaps(proxyEnabled)) # invoke a webbrowser session with desired options and capabilities set prior
elif browserName=="firefox":
driver=webdriver.Firefox(desired_capabilities=setFirefoxOptionsAndCaps(proxyEnabled),log_path=os.devnull)
else:
from msedge.selenium_tools import Edge
driver = Edge(options=setEdgeOptionsAndCaps(proxyEnabled), executable_path="/usr/bin/msedgedriver")
gloabalDriver = driver
logger.info(str(datetime.datetime.now())+" - webdriver invoked")
driver.delete_all_cookies() # delete the cookies before performing the transaction
logger.info(str(datetime.datetime.now())+" - webdriver cookies deleted")
wait=WebDriverWait(driver,maxTimeoutInSeconds) # to have smart waits at each step of transaction
windowStartTime=time.time() # a timer needs to be started before the start of the transaction to calculate the overall response time
sendAvailabilityDetails.value = False
stepTimeStart = time.time()
# <YOUR CODE GOES HERE>
# try:
# logger.info(str(datetime.datetime.now())+''' - webdriver hitting url : https://www.mysubdomain.com/''')
# driver.get("https://www.mysubdomain.com/")
# except Exception as exp:
# logger.info(str(datetime.datetime.now())+''' - webdriver url hit : https://www.mysubdomain.com/''')
# computeRCA('''Action_01 - error in hitting https://www.mysubdomain.com/''')
# takeScreenshotonError(driver)
# raise Exception('''Error in hitting https://www.mysubdomain.com/''')
# return
# stepTime = round((time.time()-stepTimeStart)*1000,6)
# computeMetric(driver,'''https://www.mysubdomain.com/''','openURL',time.time(),'Action_01',stepTime,windowStartTime,exceptionTime)
# con_error_flag= has_connection_error(driver)
# if con_error_flag:
# takeScreenshotonError(driver)
# computeRCA('''Browser thrown error while hitting https://www.mysubdomain.com/ - '''+con_error_flag)
# raise Exception('''Browser thrown error - '''+con_error_flag)
# if has_page_return_empty(driver):
# takeScreenshotonError(driver)
# computeRCA('''Page empty - Nothing rendered while hitting https://www.mysubdomain.com/''')
# raise Exception('''Page empty - Nothing rendered ''')
# logger.info(str(datetime.datetime.now())+''' - webdriver setting window size as 1295x695 dimensions''')
# driver.set_window_size(1295,695)
# stepTime = actionLoop(driver,wait,[(By.ID,"username"),(By.NAME,"username"),(By.CSS_SELECTOR,"#username"),(By.XPATH,"//input[@id='username']"),(By.XPATH,"//div[@id='form']/div/input"),(By.XPATH,"//input")],"click",'Action_02','Action_02 - ','''''')
# computeMetric(driver,'''id=username''','click',time.time(),'Action_02',stepTime,windowStartTime,exceptionTime)
# stepTime = actionLoop(driver,wait,[(By.ID,"username"),(By.NAME,"username"),(By.CSS_SELECTOR,"#username"),(By.XPATH,"//input[@id='username']"),(By.XPATH,"//div[@id='form']/div/input"),(By.XPATH,"//input")],"type",'Action_03','Action_03 - ','''''',"student")
# computeMetric(driver,'''id=username''','type',time.time(),'Action_03',stepTime,windowStartTime,exceptionTime)
# stepTime = actionLoop(driver,wait,[(By.ID,"password"),(By.NAME,"password"),(By.CSS_SELECTOR,"#password"),(By.XPATH,"//input[@id='password']"),(By.XPATH,"//div[@id='form']/div[2]/input"),(By.XPATH,"//div[2]/input")],"click",'Action_04','Action_04 - ','''''')
# computeMetric(driver,'''id=password''','click',time.time(),'Action_04',stepTime,windowStartTime,exceptionTime)
# stepTime = actionLoop(driver,wait,[(By.ID,"password"),(By.NAME,"password"),(By.CSS_SELECTOR,"#password"),(By.XPATH,"//input[@id='password']"),(By.XPATH,"//div[@id='form']/div[2]/input"),(By.XPATH,"//div[2]/input")],"type",'Action_05','Action_05 - ','''''',"password")
# computeMetric(driver,'''id=password''','type',time.time(),'Action_05',stepTime,windowStartTime,exceptionTime)
# stepTime = actionLoop(driver,wait,[(By.ID,"submit"),(By.CSS_SELECTOR,"#submit"),(By.XPATH,"//button[@id='submit']"),(By.XPATH,"//div[@id='form']/button"),(By.XPATH,"//section/div/button"),(By.XPATH,"//button[contains(.,'Submit')]")],"click",'Action_06','Action_06 - ','''''')
# computeMetric(driver,'''id=submit''','click',time.time(),'Action_06',stepTime,windowStartTime,exceptionTime)
# stepTime = actionLoop(driver,wait,[(By.LINK_TEXT,"Log out"),(By.CSS_SELECTOR,".wp-block-button__link"),(By.XPATH,"//a[contains(text(),'Log out')]"),(By.XPATH,"//div[@id='loop-container']/div/article/div[2]/div/div/div/a"),(By.XPATH,"//a[contains(@href, 'https://www.mysubdomain.com/')]"),(By.XPATH,"//div[2]/div/div/div/a"),(By.XPATH,"//a[contains(.,'Log out')]")],"click",'Action_07','Action_07 - ','''''')
# computeMetric(driver,'''linkText=Log out''','click',time.time(),'Action_07',stepTime,windowStartTime,exceptionTime)
logger.info(str(datetime.datetime.now())+" - webdriver window/tab close")
driver.close()
computeMetric(driver,"responseTime","",time.time(),"",0,windowStartTime,exceptionTime,True)
driver.quit()
### main starts from here
logEnablingFlag=False # to enable/disable logs
processManager=Manager() # to propagate changes made to attributes inside a process
sendAvailabilityDetails = processManager.Value("sendAvailabilityMetrics",False)
responseTimeMetric=processManager.dict() # to store response time
warnings=processManager.dict() # to store warnings or exceptions occurred in the transaction
errors=processManager.dict() # to store critical exception occurred in the transaction
metricObjects=processManager.dict() # to process all the metric objects for metric computation (enhanced from list to dict)
actionAvailabilityDict = processManager.dict() # to track action availability status
thresholdsDict = processManager.dict() # to store performance thresholds for each step
prevSuccessScreenshotIndex=sys.argv[0].split("/")[-1].replace(".py","") # extract script name for screenshot indexing
parser = argparse.ArgumentParser()
# Enhanced command line argument parsing for flexible configuration
parser.add_argument("-pe","--postErrScreenshot", help = "Take & Post Screenshot in case of Error", required = False)
parser.add_argument("-pp","--postPrevSuccessScreenshot",help = "Post Prev Screenshot in case of Error", required = False)
parser.add_argument("-ts","--takeScreenshotOnEvent",help = "Take Screenshot of Event", required = False)
parser.add_argument("-wb","--webBrowser",help = "choose a webBrowser (google-chrome, firefox, microsoft-edge)", required = False)
parser.add_argument("-pd","--postDeltaData",help = "Post Delta Data",required = False)
parser.add_argument("-proxy","--proxyEnabled",help = "Proxy Enabling Flag", required = False)
argument = parser.parse_args(sys.argv[2:])
# Process command line arguments with proper defaults
if argument.postErrScreenshot == 'True':
screenshotOnErrorFlag="True"
else:
screenshotOnErrorFlag="False"
if argument.postDeltaData == 'True':
postDeltaData = "True"
else:
postDeltaData = "False"
if argument.postPrevSuccessScreenshot == 'True':
postPrevSuccessSSOnEvent="True"
else:
postPrevSuccessSSOnEvent="False"
if argument.takeScreenshotOnEvent == 'True':
takeSSofEventFlag="True"
else:
takeSSofEventFlag="False"
if argument.proxyEnabled=="True":
proxyEnabled=True
else:
proxyEnabled=False
# Browser selection with fallback to Chrome
if argument.webBrowser=="microsoft-edge":
browserName="edge"
elif argument.webBrowser=="firefox":
browserName="firefox"
else:
browserName="google-chrome"
# UUID generation for unique screenshot identification
uuidString=""
if screenshotOnErrorFlag == "True":
uuidString=str(uuid.uuid4())
if logEnablingFlag: # to have logs while performing the transaction
logger.basicConfig(filename=sys.argv[0]+".log",filemode='w',level=logger.INFO)
logger.info(str(datetime.datetime.now())+" - start")
try:
try:
scriptProcess = multiprocessing.Process(target=scriptCode) # the transaction written in the scriptCode method is taken as a process
scriptProcess.start()
scriptProcess.join(float(sys.argv[1])/1000) # command line argument is taken to specify the connection timeout of the transaction, which is given by the application
if scriptProcess.is_alive():
errors.update({'''Exception''':'''connection timeout'''})
raise Exception("Exception: connection timeout")
except Exception as ex:
pass
except Exception as ex:
os.killpg(os.getpgid(scriptProcess.pid),signal.SIGTERM) # raising termination signal if some exception occurs while running the above process
finally:
formMetricJson() # need to call this function to finally update the attributes in the json which is shown in UI
scriptProcess.terminate() # Termination of the script process at the end of the transaction and json computation
Example 1
In the following example, the script edits the contents of a text field, changing the default text.
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import sys
import json
import os
import signal
import logging as logger
import datetime
import multiprocessing
from multiprocessing import Manager
import time
import re as regex
import uuid
import hashlib
import argparse
proxy=''
exceptionTime=0
stepTimeStart=0
maxTimeoutInSeconds = 30
stepExceptionTimeoutInSeconds = 25
variableDict = {}
gloabalDriver = None
def setChromeOptionsAndCaps(isProxyEnabled):
chromeOptions = webdriver.ChromeOptions()
chromeOptions.binary_location ="/usr/bin/google-chrome"
chromeOptions.add_argument('--no-sandbox')
chromeOptions.add_argument('--start-maximized')
chromeOptions.add_argument('--ignore-certificate-errors')
chromeOptions.add_argument('--disable-extensions')
chromeOptions.add_argument('--headless')
chromeOptions.add_argument('--disable-gpu')
chromeOptions.add_argument('window-size=1200x600')
chromeOptions.add_argument('--proxy-server=%s' % proxy)
chromeCapabilities=chromeOptions.to_capabilities()
chromeCapabilities['acceptSslCerts']=True
chromeCapabilities['acceptInsecureCerts']=True
#chromeCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return chromeCapabilities
def setFirefoxOptionsAndCaps(isProxyEnabled):
firefoxOptions=webdriver.FirefoxOptions()
firefoxOptions.binary_location = "/usr/bin/firefox"
firefoxOptions.add_argument('--no-sandbox')
firefoxOptions.add_argument('--start-maximized')
firefoxOptions.add_argument('--ignore-certificate-errors')
firefoxOptions.add_argument('--disable-extensions')
firefoxOptions.add_argument('--headless')
firefoxOptions.add_argument('--disable-gpu')
firefoxOptions.add_argument('window-size=1200x600')
firefoxOptions.add_argument('--proxy-server=%s' % proxy)
firefoxCapabilities=firefoxOptions.to_capabilities()
firefoxCapabilities['acceptSslCerts']=True
firefoxCapabilities['acceptInsecureCerts']=True
if isProxyEnabled:
firefoxCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return firefoxCapabilities
def setEdgeOptionsAndCaps(isProxyEnabled):
from msedge.selenium_tools import EdgeOptions
edgeOptions = EdgeOptions()
edgeOptions.use_chromium = True
edgeOptions.add_argument('headless')
edgeOptions.add_argument('--disable-gpu')
edgeOptions.binary_location = "/usr/bin/microsoft-edge"
edgeOptions.add_argument('--no-sandbox')
edgeOptions.add_argument('--start-maximized')
edgeOptions.add_argument('--ignore-certificate-errors')
edgeOptions.add_argument('--disable-extensions')
edgeOptions.add_argument('window-size=1200x600')
if isProxyEnabled:
edgeOptions.add_argument('--proxy-server=%s' % proxy)
edgeOptions.set_capability('platform', 'LINUX')
return edgeOptions
sys.tracebacklimit=0
def actionChainsCheck(driver,element,action,value):
if action == "mouseOver":
ActionChains(driver).move_to_element(driver.find_element(*element)).perform()
elif action == "type" or action == "sendKeys":
ActionChains(driver).move_to_element(driver.find_element(*element)).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
ActionChains(driver).move_to_element(driver.find_element(*element)).click().perform()
def actionLoop(driver,wait,elementList,action,stepName,rcaHeader,actionErrorMsg="",value=""):
stepTimeStart=time.time()
global exceptionTime
actionExceptionTime = 0
for i in range(len(elementList)):
try:
logger.info(str(datetime.datetime.now())+''' - webdriver performing action '''+action+''' on "'''+''.join(elementList[i]))
if action == "mouseOver":
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).perform()
elif action == "type" or action == "sendKeys":
if not ("Keys." in value) and "type"==action:
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).clear()
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).click()
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as exp :
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(exp))
stepTimeStop=time.time()
actionExceptionTime = actionExceptionTime + stepTimeStop - stepTimeStart
exceptionTime= exceptionTime + stepTimeStop - stepTimeStart
if i < len(elementList)-1 :
stepTimeStart=time.time()
continue
else:
localExceptionTime = stepTimeStop-stepTimeStart
stepTimeStart=time.time()
try:
if localExceptionTime >= stepExceptionTimeoutInSeconds :
actionChainsCheck(driver,elementList[0],action,value)
else:
sleepLoop(driver,elementList[0],action,value)
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as ex:
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(ex))
if not actionErrorMsg == "":
computeRCA(rcaHeader + actionErrorMsg,True)
else:
computeRCA(rcaHeader + action + " on "+ ''.join(elementList[i])+" cannot be performed ")
takeScreenshotonError(driver)
if not stepName == "":
metricObjects.update({stepName:metricsProcessed(action,elementList[0],0,stepName,"0","")})
raise Exception('''Exception:'''+ action +''' on '''+''.join(elementList[i])+''' cannot be performed''')
def sleepLoop(driver,element,action,value=''):
for x in range(maxTimeoutInSeconds):
try:
time.sleep(1)
actionChainsCheck(driver,element,action,value)
break
except Exception as exp:
if x >= maxTimeoutInSeconds-1:
raise Exception(exp)
else:
continue
class metricsProcessed:
def __init__(self,actionPerformed,uiLocatorString,metricTimeTaken,stepName=None,actionAvailabilityStatus=None,thresholdBreached=None):
self.actionPerformed=actionPerformed
self.uiLocatorString=uiLocatorString
self.stepName=stepName
self.metricTimeTaken=metricTimeTaken
self.actionAvailabilityStatus=actionAvailabilityStatus
self.thresholdBreached=thresholdBreached
def regexCheckSelect(value,dropDownElement):
try:
value=value.replace("regexp:","");
pattern=regex.compile(value)
for dropDownOption in dropDownElement.options :
if(pattern.match(dropDownOption.text)):
dropDownElement.select_by_visible_text(dropDownOption.text)
break
except Exception as exp:
logger.error("Error Occured while Parsing Regex values "+str(exp))
def isJsonDictionaryEmpty(metricDictionary):
if json.dumps(metricDictionary)=="null":
logger.info("The metric Dictionary is null")
return "\"\""
else:
logger.info("Metric Dictionary is not null")
return str("\""+str(metricDictionary)+"\"")
def getValueFromVariableDict(key):
if key in variableDict.keys():
return variableDict[key]
else:
return "${"+key+"}"
def computeRCA(rcaMsg,actionErrorMsgCheck=False):
global gloabalDriver
if actionErrorMsgCheck:
import re
rcaMsg = re.sub(r'\${(\w+)}', lambda m: variableDict.get(m.group(1), m.group(0)), rcaMsg)
rcaMsg = rcaMsg[:500]
rcaMsg = rcaMsg + " ( Current URL - "+gloabalDriver.current_url+" )"
warnings.update({'''Exception''':rcaMsg})
def computeMetric(driver,uiLocatorString,actionPerfomed,actionTime,stepName,stepTime,windowStartTime,exceptionTime=0,isCompleted=False):
thresholdBreached = "0"
if postDeltaData == "False" or isCompleted:
timeTaken=round((actionTime-windowStartTime-exceptionTime)*1000,2)
else:
timeTaken = round((stepTime),2)
uiLocatorString=uiLocatorString.replace("\"","").replace("\\","")
if thresholdsDict.get(stepName) is not None and timeTaken > thresholdsDict[stepName]:
thresholdBreached = "1"
if isCompleted:
responseTimeMetric.update({"responseTime":timeTaken})
else:
if not postDeltaData == "False":
if stepName in metricObjects:
timeTaken += (metricObjects.get(stepName)).metricTimeTaken
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
else:
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
if not stepName=='':
takeScreenshotofEvent(driver,stepName)
def formMetricJson():
filePointer=open(sys.argv[0]+'.json','w')
filePointer.write("{\n")
filePointer.write("\t\"responseTime\" : "+isJsonDictionaryEmpty(responseTimeMetric.get("responseTime")))
filePointer.write(",\n\t\"warningInfo\" : "+isJsonDictionaryEmpty(warnings.get("Exception")))
filePointer.write(",\n\t\"errorInfo\" : "+isJsonDictionaryEmpty(errors.get("Exception")))
filePointer.write(",\n\t\"screenshotOnError\" : \""+screenshotOnErrorFlag+"\"")
if screenshotOnErrorFlag=="True" and responseTimeMetric.get("responseTime")==None and not errors.get("Exception")=="connection timeout":
filePointer.write(",\n\t\"uuidString\" : \""+uuidString+"\"")
logger.info(str(datetime.datetime.now())+''' - Generating md5hash checksum for the png file''')
filePointer.write(",\n\t\"md5hashString\" : \""+hashlib.md5(open('/opt/opsramp/webprobe/data/'+uuidString+'.png','rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"screenshotTS\" : \""+str(int(time.time())*1000)+"\"")
if postPrevSuccessSSOnEvent=="True":
prevSuccessScreenshotFile=prevSuccessScreenshotIndex+"_"+str(warnings.get("Exception")).split("-")[0]+".png"
prevSuccessScreenshotFilePath='/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+"/"+prevSuccessScreenshotFile
if(os.path.isfile(prevSuccessScreenshotFilePath)):
filePointer.write(",\n\t\"prevSuccessScreenshotUUID\" : \""+prevSuccessScreenshotFile+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotHash\" : \""+hashlib.md5(open(prevSuccessScreenshotFilePath,'rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotTS\" : \""+str(int(os.path.getctime(prevSuccessScreenshotFilePath))*1000)+"\"")
if sendAvailabilityDetails.value:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName,"actionAvailabilityStatus":obj.actionAvailabilityStatus,"actionThresholdBreached":obj.thresholdBreached} for obj in metricObjects.values()],indent=4))
else:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName} for obj in metricObjects.values()],indent=4))
filePointer.write("\n}")
filePointer.close()
def takeScreenshotonError(driver):
if screenshotOnErrorFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option opted''')
driver.save_screenshot("/opt/opsramp/webprobe/data/"+uuidString+".png")
else :
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option not opted''')
def takeScreenshotofEvent(driver,stepName):
if takeSSofEventFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Event option opted''')
logger.info(str(datetime.datetime.now())+''' Screenshot saved '''+'/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
driver.save_screenshot('/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
def has_connection_error(driver):
try:
browser_error = driver.find_element(By.CLASS_NAME,"error-code")
if not browser_error.text:
return False
else:
return browser_error.text
except:
return False
def has_page_return_empty(driver):
for i in range(maxTimeoutInSeconds):
try:
page_not_loaded= driver.find_element(By.XPATH,"//body")
if not page_not_loaded.text:
time.sleep(1)
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
continue
else:
return False
except:
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
time.sleep(1)
continue
return True
def scriptCode():
global gloabalDriver
logger.info(str(datetime.datetime.now())+" - before webdriver invocation")
if browserName=="google-chrome":
driver=webdriver.Chrome(desired_capabilities=setChromeOptionsAndCaps(proxyEnabled))
elif browserName=="firefox":
driver=webdriver.Firefox(desired_capabilities=setFirefoxOptionsAndCaps(proxyEnabled),log_path=os.devnull)
else:
from msedge.selenium_tools import Edge
driver = Edge(options=setEdgeOptionsAndCaps(proxyEnabled), executable_path="/usr/bin/msedgedriver")
gloabalDriver = driver
logger.info(str(datetime.datetime.now())+" - webdriver invoked")
driver.delete_all_cookies()
logger.info(str(datetime.datetime.now())+" - webdriver cookies deleted")
wait=WebDriverWait(driver,maxTimeoutInSeconds)
windowStartTime=time.time()
sendAvailabilityDetails.value = False
stepTimeStart = time.time()
try:
logger.info(str(datetime.datetime.now())+''' - webdriver hitting url : https://www.mysubdomain.com/''')
driver.get("https://www.mysubdomain.com/")
except Exception as exp:
logger.info(str(datetime.datetime.now())+''' - webdriver url hit :https://www.mysubdomain.com/''')
computeRCA('''Action_01 - error in hitting https://www.mysubdomain.com/''')
takeScreenshotonError(driver)
raise Exception('''Error in hitting https://www.mysubdomain.com/''')
return
stepTime = round((time.time()-stepTimeStart)*1000,6)
computeMetric(driver,'''https://www.mysubdomain.com/''','openURL',time.time(),'Action_01',stepTime,windowStartTime,exceptionTime)
con_error_flag= has_connection_error(driver)
if con_error_flag:
takeScreenshotonError(driver)
computeRCA('''Browser thrown error while hitting https://www.mysubdomain.com/ - '''+con_error_flag)
raise Exception('''Browser thrown error - '''+con_error_flag)
if has_page_return_empty(driver):
takeScreenshotonError(driver)
computeRCA('''Page empty - Nothing rendered while hitting https://www.mysubdomain.com/''')
raise Exception('''Page empty - Nothing rendered ''')
logger.info(str(datetime.datetime.now())+''' - webdriver setting window size as 1295x695 dimensions''')
driver.set_window_size(1295,695)
stepTime = actionLoop(driver,wait,[(By.ID,"username"),(By.NAME,"username"),(By.CSS_SELECTOR,"#username"),(By.XPATH,"//input[@id='username']"),(By.XPATH,"//div[@id='form']/div/input"),(By.XPATH,"//input")],"click",'Action_02','Action_02 - ','''''')
computeMetric(driver,'''id=username''','click',time.time(),'Action_02',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"username"),(By.NAME,"username"),(By.CSS_SELECTOR,"#username"),(By.XPATH,"//input[@id='username']"),(By.XPATH,"//div[@id='form']/div/input"),(By.XPATH,"//input")],"type",'Action_03','Action_03 - ','''''',"username")
computeMetric(driver,'''id=username''','type',time.time(),'Action_03',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"password"),(By.NAME,"password"),(By.CSS_SELECTOR,"#password"),(By.XPATH,"//input[@id='password']"),(By.XPATH,"//div[@id='form']/div[2]/input"),(By.XPATH,"//div[2]/input")],"click",'Action_04','Action_04 - ','''''')
computeMetric(driver,'''id=password''','click',time.time(),'Action_04',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"password"),(By.NAME,"password"),(By.CSS_SELECTOR,"#password"),(By.XPATH,"//input[@id='password']"),(By.XPATH,"//div[@id='form']/div[2]/input"),(By.XPATH,"//div[2]/input")],"type",'Action_05','Action_05 - ','''''',"password")
computeMetric(driver,'''id=password''','type',time.time(),'Action_05',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"submit"),(By.CSS_SELECTOR,"#submit"),(By.XPATH,"//button[@id='submit']"),(By.XPATH,"//div[@id='form']/button"),(By.XPATH,"//section/div/button"),(By.XPATH,"//button[contains(.,'Submit')]")],"click",'Action_06','Action_06 - ','''''')
computeMetric(driver,'''id=submit''','click',time.time(),'Action_06',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.LINK_TEXT,"Log out"),(By.CSS_SELECTOR,".wp-block-button__link"),(By.XPATH,"//a[contains(text(),'Log out')]"),(By.XPATH,"//div[@id='loop-container']/div/article/div[2]/div/div/div/a"),(By.XPATH,"//a[contains(@href, 'https://www.mysubdomain.com/')]"),(By.XPATH,"//div[2]/div/div/div/a"),(By.XPATH,"//a[contains(.,'Log out')]")],"click",'Action_07','Action_07 - ','''''')
computeMetric(driver,'''linkText=Log out''','click',time.time(),'Action_07',stepTime,windowStartTime,exceptionTime)
logger.info(str(datetime.datetime.now())+" - webdriver window/tab close")
driver.close()
computeMetric(driver,"responseTime","",time.time(),"",0,windowStartTime,exceptionTime,True)
driver.quit()
### main starts from here
logEnablingFlag=False
processManager=Manager()
sendAvailabilityDetails = processManager.Value("sendAvailabilityMetrics",False)
responseTimeMetric=processManager.dict()
warnings=processManager.dict()
errors=processManager.dict()
metricObjects=processManager.dict()
actionAvailabilityDict = processManager.dict()
thresholdsDict = processManager.dict()
prevSuccessScreenshotIndex=sys.argv[0].split("/")[-1].replace(".py","")
parser = argparse.ArgumentParser()
parser.add_argument("-pe","--postErrScreenshot", help = "Take & Post Screenshot in case of Error", required = False)
parser.add_argument("-pp","--postPrevSuccessScreenshot",help = "Post Prev Screenshot in case of Error", required = False)
parser.add_argument("-ts","--takeScreenshotOnEvent",help = "Take Screenshot of Event", required = False)
parser.add_argument("-wb","--webBrowser",help = "choose a webBrowser", required = False)
parser.add_argument("-pd","--postDeltaData",help = "Post Delta Data",required = False)
parser.add_argument("-proxy","--proxyEnabled",help = "Proxy Enabling Flag", required = False)
argument = parser.parse_args(sys.argv[2:])
if argument.postErrScreenshot == 'True':
screenshotOnErrorFlag="True"
else:
screenshotOnErrorFlag="False"
if argument.postDeltaData == 'True':
postDeltaData = "True"
else:
postDeltaData = "False"
if argument.postPrevSuccessScreenshot == 'True':
postPrevSuccessSSOnEvent="True"
else:
postPrevSuccessSSOnEvent="False"
if argument.takeScreenshotOnEvent == 'True':
takeSSofEventFlag="True"
else:
takeSSofEventFlag="False"
if argument.proxyEnabled=="True":
proxyEnabled=True
else:
proxyEnabled=False
if argument.webBrowser=="microsoft-edge":
browserName="edge"
elif argument.webBrowser=="firefox":
browserName="firefox"
else:
browserName="google-chrome"
uuidString=""
if screenshotOnErrorFlag == "True":
uuidString=str(uuid.uuid4())
if logEnablingFlag:
logger.basicConfig(filename=sys.argv[0]+".log",filemode='w',level=logger.INFO)
logger.info(str(datetime.datetime.now())+" - start")
try:
try:
scriptProcess = multiprocessing.Process(target=scriptCode)
scriptProcess.start()
scriptProcess.join(float(sys.argv[1])/1000)
if scriptProcess.is_alive():
errors.update({'''Exception''':'''connection timeout'''})
raise Exception("Exception: connection timeout")
except Exception as ex:
pass
except Exception as ex:
os.killpg(os.getpgid(scriptProcess.pid),signal.SIGTERM)
finally:
formMetricJson()
scriptProcess.terminate()
Example 2
In the following example, the script asserts an html alert that displays when the button is double-clicked.
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import sys
import json
import os
import signal
import logging as logger
import datetime
import multiprocessing
from multiprocessing import Manager
import time
import re as regex
import uuid
import hashlib
import argparse
proxy=''
exceptionTime=0
stepTimeStart=0
maxTimeoutInSeconds = 30
stepExceptionTimeoutInSeconds = 25
variableDict = {}
gloabalDriver = None
def setChromeOptionsAndCaps(isProxyEnabled):
chromeOptions = webdriver.ChromeOptions()
chromeOptions.binary_location ="/usr/bin/google-chrome"
chromeOptions.add_argument('--no-sandbox')
chromeOptions.add_argument('--start-maximized')
chromeOptions.add_argument('--ignore-certificate-errors')
chromeOptions.add_argument('--disable-extensions')
chromeOptions.add_argument('--headless')
chromeOptions.add_argument('--disable-gpu')
chromeOptions.add_argument('window-size=1200x600')
chromeOptions.add_argument('--proxy-server=%s' % proxy)
chromeCapabilities=chromeOptions.to_capabilities()
chromeCapabilities['acceptSslCerts']=True
chromeCapabilities['acceptInsecureCerts']=True
#chromeCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return chromeCapabilities
def setFirefoxOptionsAndCaps(isProxyEnabled):
firefoxOptions=webdriver.FirefoxOptions()
firefoxOptions.binary_location = "/usr/bin/firefox"
firefoxOptions.add_argument('--no-sandbox')
firefoxOptions.add_argument('--start-maximized')
firefoxOptions.add_argument('--ignore-certificate-errors')
firefoxOptions.add_argument('--disable-extensions')
firefoxOptions.add_argument('--headless')
firefoxOptions.add_argument('--disable-gpu')
firefoxOptions.add_argument('window-size=1200x600')
firefoxOptions.add_argument('--proxy-server=%s' % proxy)
firefoxCapabilities=firefoxOptions.to_capabilities()
firefoxCapabilities['acceptSslCerts']=True
firefoxCapabilities['acceptInsecureCerts']=True
if isProxyEnabled:
firefoxCapabilities['proxy'] = {'proxyType': 'MANUAL','httpProxy': proxy,'ftpProxy': proxy,'sslProxy': proxy,'noProxy': '','class': "org.openqa.selenium.Proxy",'autodetect': False}
return firefoxCapabilities
def setEdgeOptionsAndCaps(isProxyEnabled):
from msedge.selenium_tools import EdgeOptions
edgeOptions = EdgeOptions()
edgeOptions.use_chromium = True
edgeOptions.add_argument('headless')
edgeOptions.add_argument('--disable-gpu')
edgeOptions.binary_location = "/usr/bin/microsoft-edge"
edgeOptions.add_argument('--no-sandbox')
edgeOptions.add_argument('--start-maximized')
edgeOptions.add_argument('--ignore-certificate-errors')
edgeOptions.add_argument('--disable-extensions')
edgeOptions.add_argument('window-size=1200x600')
if isProxyEnabled:
edgeOptions.add_argument('--proxy-server=%s' % proxy)
edgeOptions.set_capability('platform', 'LINUX')
return edgeOptions
sys.tracebacklimit=0
def actionChainsCheck(driver,element,action,value):
if action == "mouseOver":
ActionChains(driver).move_to_element(driver.find_element(*element)).perform()
elif action == "type" or action == "sendKeys":
ActionChains(driver).move_to_element(driver.find_element(*element)).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
ActionChains(driver).move_to_element(driver.find_element(*element)).click().perform()
def actionLoop(driver,wait,elementList,action,stepName,rcaHeader,actionErrorMsg="",value=""):
stepTimeStart=time.time()
global exceptionTime
actionExceptionTime = 0
for i in range(len(elementList)):
try:
logger.info(str(datetime.datetime.now())+''' - webdriver performing action '''+action+''' on "'''+''.join(elementList[i]))
if action == "mouseOver":
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).perform()
elif action == "type" or action == "sendKeys":
if not ("Keys." in value) and "type"==action:
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).clear()
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).send_keys(value)
elif action == "click" or action =="removeSelection" or action == "addSelection" or action == "uncheck" or action == "check" or action == "clickAt":
wait.until(expected_conditions.element_to_be_clickable(elementList[i])).click()
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as exp :
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(exp))
stepTimeStop=time.time()
actionExceptionTime = actionExceptionTime + stepTimeStop - stepTimeStart
exceptionTime= exceptionTime + stepTimeStop - stepTimeStart
if i < len(elementList)-1 :
stepTimeStart=time.time()
continue
else:
localExceptionTime = stepTimeStop-stepTimeStart
stepTimeStart=time.time()
try:
if localExceptionTime >= stepExceptionTimeoutInSeconds :
actionChainsCheck(driver,elementList[0],action,value)
else:
sleepLoop(driver,elementList[0],action,value)
if postDeltaData == "False":
return round((time.time()-stepTimeStart-actionExceptionTime)*1000,3)
else:
return round((time.time()-stepTimeStart)*1000,3)
except Exception as ex:
logger.error(str(datetime.datetime.now())+''' - webdriver can not perform '''+action+''' on '''+''.join(elementList[i])+''' with action chains failed due to : '''+str(ex))
if not actionErrorMsg == "":
computeRCA(rcaHeader + actionErrorMsg,True)
else:
computeRCA(rcaHeader + action + " on "+ ''.join(elementList[i])+" cannot be performed ")
takeScreenshotonError(driver)
if not stepName == "":
metricObjects.update({stepName:metricsProcessed(action,elementList[0],0,stepName,"0","")})
raise Exception('''Exception:'''+ action +''' on '''+''.join(elementList[i])+''' cannot be performed''')
def sleepLoop(driver,element,action,value=''):
for x in range(maxTimeoutInSeconds):
try:
time.sleep(1)
actionChainsCheck(driver,element,action,value)
break
except Exception as exp:
if x >= maxTimeoutInSeconds-1:
raise Exception(exp)
else:
continue
class metricsProcessed:
def __init__(self,actionPerformed,uiLocatorString,metricTimeTaken,stepName=None,actionAvailabilityStatus=None,thresholdBreached=None):
self.actionPerformed=actionPerformed
self.uiLocatorString=uiLocatorString
self.stepName=stepName
self.metricTimeTaken=metricTimeTaken
self.actionAvailabilityStatus=actionAvailabilityStatus
self.thresholdBreached=thresholdBreached
def regexCheckSelect(value,dropDownElement):
try:
value=value.replace("regexp:","");
pattern=regex.compile(value)
for dropDownOption in dropDownElement.options :
if(pattern.match(dropDownOption.text)):
dropDownElement.select_by_visible_text(dropDownOption.text)
break
except Exception as exp:
logger.error("Error Occured while Parsing Regex values "+str(exp))
def isJsonDictionaryEmpty(metricDictionary):
if json.dumps(metricDictionary)=="null":
logger.info("The metric Dictionary is null")
return "\"\""
else:
logger.info("Metric Dictionary is not null")
return str("\""+str(metricDictionary)+"\"")
def getValueFromVariableDict(key):
if key in variableDict.keys():
return variableDict[key]
else:
return "${"+key+"}"
def computeRCA(rcaMsg,actionErrorMsgCheck=False):
global gloabalDriver
if actionErrorMsgCheck:
import re
rcaMsg = re.sub(r'\${(\w+)}', lambda m: variableDict.get(m.group(1), m.group(0)), rcaMsg)
rcaMsg = rcaMsg[:500]
rcaMsg = rcaMsg + " ( Current URL - "+gloabalDriver.current_url+" )"
warnings.update({'''Exception''':rcaMsg})
def computeMetric(driver,uiLocatorString,actionPerfomed,actionTime,stepName,stepTime,windowStartTime,exceptionTime=0,isCompleted=False):
thresholdBreached = "0"
if postDeltaData == "False" or isCompleted:
timeTaken=round((actionTime-windowStartTime-exceptionTime)*1000,2)
else:
timeTaken = round((stepTime),2)
uiLocatorString=uiLocatorString.replace("\"","").replace("\\","")
if thresholdsDict.get(stepName) is not None and timeTaken > thresholdsDict[stepName]:
thresholdBreached = "1"
if isCompleted:
responseTimeMetric.update({"responseTime":timeTaken})
else:
if not postDeltaData == "False":
if stepName in metricObjects:
timeTaken += (metricObjects.get(stepName)).metricTimeTaken
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
else:
metricObjects.update({stepName:metricsProcessed(actionPerfomed,uiLocatorString,timeTaken,stepName,"1",thresholdBreached)})
if not stepName=='':
takeScreenshotofEvent(driver,stepName)
def formMetricJson():
filePointer=open(sys.argv[0]+'.json','w')
filePointer.write("{\n")
filePointer.write("\t\"responseTime\" : "+isJsonDictionaryEmpty(responseTimeMetric.get("responseTime")))
filePointer.write(",\n\t\"warningInfo\" : "+isJsonDictionaryEmpty(warnings.get("Exception")))
filePointer.write(",\n\t\"errorInfo\" : "+isJsonDictionaryEmpty(errors.get("Exception")))
filePointer.write(",\n\t\"screenshotOnError\" : \""+screenshotOnErrorFlag+"\"")
if screenshotOnErrorFlag=="True" and responseTimeMetric.get("responseTime")==None and not errors.get("Exception")=="connection timeout":
filePointer.write(",\n\t\"uuidString\" : \""+uuidString+"\"")
logger.info(str(datetime.datetime.now())+''' - Generating md5hash checksum for the png file''')
filePointer.write(",\n\t\"md5hashString\" : \""+hashlib.md5(open('/opt/opsramp/webprobe/data/'+uuidString+'.png','rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"screenshotTS\" : \""+str(int(time.time())*1000)+"\"")
if postPrevSuccessSSOnEvent=="True":
prevSuccessScreenshotFile=prevSuccessScreenshotIndex+"_"+str(warnings.get("Exception")).split("-")[0]+".png"
prevSuccessScreenshotFilePath='/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+"/"+prevSuccessScreenshotFile
if(os.path.isfile(prevSuccessScreenshotFilePath)):
filePointer.write(",\n\t\"prevSuccessScreenshotUUID\" : \""+prevSuccessScreenshotFile+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotHash\" : \""+hashlib.md5(open(prevSuccessScreenshotFilePath,'rb').read()).hexdigest()+"\"")
filePointer.write(",\n\t\"prevSuccessScreenshotTS\" : \""+str(int(os.path.getctime(prevSuccessScreenshotFilePath))*1000)+"\"")
if sendAvailabilityDetails.value:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName,"actionAvailabilityStatus":obj.actionAvailabilityStatus,"actionThresholdBreached":obj.thresholdBreached} for obj in metricObjects.values()],indent=4))
else:
filePointer.write(",\n\t\"granularSteps\" : "+json.dumps([{"timeTaken":str(round(obj.metricTimeTaken,6)),"actionPerfomed":obj.actionPerformed,"uiLocatorString":obj.uiLocatorString,"stepName":obj.stepName} for obj in metricObjects.values()],indent=4))
filePointer.write("\n}")
filePointer.close()
def takeScreenshotonError(driver):
if screenshotOnErrorFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option opted''')
driver.save_screenshot("/opt/opsramp/webprobe/data/"+uuidString+".png")
else :
logger.info(str(datetime.datetime.now())+''' Screenshot on Error option not opted''')
def takeScreenshotofEvent(driver,stepName):
if takeSSofEventFlag=="True":
logger.info(str(datetime.datetime.now())+''' Screenshot on Event option opted''')
logger.info(str(datetime.datetime.now())+''' Screenshot saved '''+'/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
driver.save_screenshot('/opt/opsramp/webprobe/data/'+prevSuccessScreenshotIndex+'/'+prevSuccessScreenshotIndex+'_'+stepName+'.png')
def has_connection_error(driver):
try:
browser_error = driver.find_element(By.CLASS_NAME,"error-code")
if not browser_error.text:
return False
else:
return browser_error.text
except:
return False
def has_page_return_empty(driver):
for i in range(maxTimeoutInSeconds):
try:
page_not_loaded= driver.find_element(By.XPATH,"//body")
if not page_not_loaded.text:
time.sleep(1)
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
continue
else:
return False
except:
logger.info(str(datetime.datetime.now())+" - page empty check waiting for 1 second")
time.sleep(1)
continue
return True
def scriptCode():
global gloabalDriver
logger.info(str(datetime.datetime.now())+" - before webdriver invocation")
if browserName=="google-chrome":
driver=webdriver.Chrome(desired_capabilities=setChromeOptionsAndCaps(proxyEnabled))
elif browserName=="firefox":
driver=webdriver.Firefox(desired_capabilities=setFirefoxOptionsAndCaps(proxyEnabled),log_path=os.devnull)
else:
from msedge.selenium_tools import Edge
driver = Edge(options=setEdgeOptionsAndCaps(proxyEnabled), executable_path="/usr/bin/msedgedriver")
gloabalDriver = driver
logger.info(str(datetime.datetime.now())+" - webdriver invoked")
driver.delete_all_cookies()
logger.info(str(datetime.datetime.now())+" - webdriver cookies deleted")
wait=WebDriverWait(driver,maxTimeoutInSeconds)
windowStartTime=time.time()
sendAvailabilityDetails.value = False
stepTimeStart = time.time()
try:
logger.info(str(datetime.datetime.now())+''' - webdriver hitting url : https://www.mysubdomain.com/''')
driver.get("https://www.mysubdomain.com/")
except Exception as exp:
logger.info(str(datetime.datetime.now())+''' - webdriver url hit : https://www.mysubdomain.com/''')
computeRCA('''error in hitting https://www.mysubdomain.com/''')
takeScreenshotonError(driver)
raise Exception('''Error in hitting https://www.mysubdomain.com/''')
return
stepTime = round((time.time()-stepTimeStart)*1000,6)
computeMetric(driver,'''https://www.mysubdomain.com/''','openURL',time.time(),'',stepTime,windowStartTime,exceptionTime)
con_error_flag= has_connection_error(driver)
if con_error_flag:
takeScreenshotonError(driver)
computeRCA('''Browser thrown error while hitting https://www.mysubdomain.com/ - '''+con_error_flag)
raise Exception('''Browser thrown error - '''+con_error_flag)
if has_page_return_empty(driver):
takeScreenshotonError(driver)
computeRCA('''Page empty - Nothing rendered while hitting https://www.mysubdomain.com/''')
raise Exception('''Page empty - Nothing rendered ''')
logger.info(str(datetime.datetime.now())+''' - webdriver setting window size as 1280x672 dimensions''')
driver.set_window_size(1280,672)
stepTime = actionLoop(driver,wait,[(By.ID,"multiselect1"),(By.CSS_SELECTOR,"#multiselect1"),(By.XPATH,"//select[@id='multiselect1']"),(By.XPATH,"//div[@id='HTML14']/div/select"),(By.XPATH,"//select")],"addSelection",'','','''''')
computeMetric(driver,'''id=multiselect1''','addSelection',time.time(),'',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"multiselect1"),(By.CSS_SELECTOR,"#multiselect1"),(By.XPATH,"//select[@id='multiselect1']"),(By.XPATH,"//div[@id='HTML14']/div/select"),(By.XPATH,"//select")],"removeSelection",'','','''''')
computeMetric(driver,'''id=multiselect1''','removeSelection',time.time(),'',stepTime,windowStartTime,exceptionTime)
stepTime = actionLoop(driver,wait,[(By.ID,"multiselect1"),(By.CSS_SELECTOR,"#multiselect1"),(By.XPATH,"//select[@id='multiselect1']"),(By.XPATH,"//div[@id='HTML14']/div/select"),(By.XPATH,"//select")],"addSelection",'','','''''')
computeMetric(driver,'''id=multiselect1''','addSelection',time.time(),'',stepTime,windowStartTime,exceptionTime)
logger.info(str(datetime.datetime.now())+" - webdriver window/tab close")
driver.close()
computeMetric(driver,"responseTime","",time.time(),"",0,windowStartTime,exceptionTime,True)
driver.quit()
### main starts from here
logEnablingFlag=False
processManager=Manager()
sendAvailabilityDetails = processManager.Value("sendAvailabilityMetrics",False)
responseTimeMetric=processManager.dict()
warnings=processManager.dict()
errors=processManager.dict()
metricObjects=processManager.dict()
actionAvailabilityDict = processManager.dict()
thresholdsDict = processManager.dict()
prevSuccessScreenshotIndex=sys.argv[0].split("/")[-1].replace(".py","")
parser = argparse.ArgumentParser()
parser.add_argument("-pe","--postErrScreenshot", help = "Take & Post Screenshot in case of Error", required = False)
parser.add_argument("-pp","--postPrevSuccessScreenshot",help = "Post Prev Screenshot in case of Error", required = False)
parser.add_argument("-ts","--takeScreenshotOnEvent",help = "Take Screenshot of Event", required = False)
parser.add_argument("-wb","--webBrowser",help = "choose a webBrowser", required = False)
parser.add_argument("-pd","--postDeltaData",help = "Post Delta Data",required = False)
parser.add_argument("-proxy","--proxyEnabled",help = "Proxy Enabling Flag", required = False)
argument = parser.parse_args(sys.argv[2:])
if argument.postErrScreenshot == 'True':
screenshotOnErrorFlag="True"
else:
screenshotOnErrorFlag="False"
if argument.postDeltaData == 'True':
postDeltaData = "True"
else:
postDeltaData = "False"
if argument.postPrevSuccessScreenshot == 'True':
postPrevSuccessSSOnEvent="True"
else:
postPrevSuccessSSOnEvent="False"
if argument.takeScreenshotOnEvent == 'True':
takeSSofEventFlag="True"
else:
takeSSofEventFlag="False"
if argument.proxyEnabled=="True":
proxyEnabled=True
else:
proxyEnabled=False
if argument.webBrowser=="microsoft-edge":
browserName="edge"
elif argument.webBrowser=="firefox":
browserName="firefox"
else:
browserName="google-chrome"
uuidString=""
if screenshotOnErrorFlag == "True":
uuidString=str(uuid.uuid4())
if logEnablingFlag:
logger.basicConfig(filename=sys.argv[0]+".log",filemode='w',level=logger.INFO)
logger.info(str(datetime.datetime.now())+" - start")
try:
try:
scriptProcess = multiprocessing.Process(target=scriptCode)
scriptProcess.start()
scriptProcess.join(float(sys.argv[1])/1000)
if scriptProcess.is_alive():
errors.update({'''Exception''':'''connection timeout'''})
raise Exception("Exception: connection timeout")
except Exception as ex:
pass
except Exception as ex:
os.killpg(os.getpgid(scriptProcess.pid),signal.SIGTERM)
finally:
formMetricJson()
scriptProcess.terminate()