#!/usr/bin/env python # The prism-auto script automatically executes PRISM on one or more # models/properties, for the purposes of benchmarking or testing. # The simplest usage is just "prism-auto " where # is a directory, model file or properties file. For a directory, # prism-auto finds all models and all properties files in the # directory and then executes PRISM on each combination of them. # For a model file, it runs against all properties, and vice versa. # Run "prism-auto -h" for details of further options. import os,sys,re,subprocess,signal,tempfile,functools,logging,time,platform,csv from pipes import quote from optparse import OptionParser from threading import Timer #================================================================================================== # Global variables #================================================================================================== # statistics about test results testStats = dict(SUCCESS = 0, FAILURE = 0, FAIL_NONCONVERGE = 0, SKIPPED = 0, SKIPPED_EXPORT = 0, SKIPPED_DUPLICATE = 0, SKIPPED_NOT_TESTED = 0, UNSUPPORTED = 0, WARNING = 0, DDWARNING = 0, TIMEOUT = 0) # colour coding for test results # for colour values, see https://en.wikipedia.org/wiki/ANSI_escape_code#Colors testColours = dict(SUCCESS = 32, FAILURE = 31, SKIPPED = 90, UNSUPPORTED = 34, WARNING = 33) # mapping from PRISM switch shortcuts to the full version (for canonicalisation of arguments) # to get a list of abbreviated settings in prism, run prism --help | grep '(or ' switchShortcuts = { # engines: '-ex': '-explicit', '-m': '-mtbdd', '-s': '-sparse', '-h': '-hybrid', # iteration/solver settings: '-pow': '-power', '-pwr': '-power', '-jac': '-jacobi', '-gs' : '-gaussseidel', '-bgs': '-bgaussseidel', '-pgs': '-pgaussseidel', '-bpgs': '-bpgaussseidel', '-ii': '-intervaliter', '-lp': '-linprog', # epsilon related settings: '-rel': '-relative', '-abs': '-absolute', '-e': '-epsilon', # other: '-ss': '-steadystate', '-tr': '-transient', '-pctl': '-pf', '-csl': '-pf', '-prop': '-property', } # set of PRISM argument tuples that already ran alreadyRan = set() #================================================================================================== # Utility functions #================================================================================================== # returns true if we are running on Windows def isWindows(): s = platform.system() return s == 'Windows' or re.match('CYGWIN', s) != None # compare two files (with filenames f1,f2) for equality def compareFiles(f1,f2): with open(f1, 'r') as fp1, open(f2, 'r') as fp2: while True: s1 = fp1.readline() s2 = fp2.readline() if s1 != s2: # mismatch return False if s1 == '': # EOF (in both files) return True # returns a sorted list of files / directories in dir def sortedListDir(dir): list = os.listdir(dir); list.sort() return list def isPrismModelFile(file): return re.match('.+(\.prism|\.nm|\.pm|\.sm)$', file) def isPrismPropertiesFile(file): return re.match('.+(\.props|\.pctl|\.csl)$', file) def isPrismModelListFile(file): return re.match('models$', os.path.basename(file)) def isPrismPropListFile(file): return re.match('.+(\.txt)$', file) def isOutFile(file): return re.match('.+(\.out\.)', file) # Note that this matches anything that contains a .out anywhere in the name def isAutoFile(file): return re.match('.+(\.auto)$', file) # Check whether the given (args|auto) file doesn't have an associated model def isOrphan(dir, file): return not getMatchingModelsInDir(dir, file) def lineIsCommentedOut(line): return line.startswith('#') # Get a list of models in a directory, either from a "models" file if present, # or by searching for all files in the directory with an appropriate extension. # A "models" file is a list of (relative) path names, in which lines starting with # are ignored. # Each item of the returned list is itself a tuple consisting of the name of the model file and # a possibly empty argument list, e.g. ("model.pm", ["-const", "N=2"]) # # The name of the "models" file is configurable, defaults to "models" and is # stored in options.modelsFilename def getModelsInDir(dir): modelFiles = [] # Process "models" file, if present if os.path.isfile(os.path.join(dir, options.modelsFilename)): for line in open(os.path.join(dir, options.modelsFilename), 'r').readlines(): line = line.strip() if len(line) == 0 or lineIsCommentedOut(line): continue first = 1 args = [] for item in line.split(' '): if first: modelFile = item first = 0 else: args.append(item) modelFiles.append((modelFile, args)) # Otherwise look for all model files else: for file in sortedListDir(dir): if os.path.isfile(os.path.join(dir, file)) and isPrismModelFile(file): modelFiles.append((file, [])) #print "Model files in " + dir + ": " + ' '.join( map(lambda pair: pair[0], modelFiles )) # If requested, filter the list of models if options.filterModels: modelFiles = filterModels(modelFiles, options.filterModels, dir) # Finally, append the dir name to all the model files modelFiles = map(lambda pair: (os.path.join(dir, pair[0]),pair[1]), modelFiles) return modelFiles # Restrict a list of models to those satisfying a filter expressed as a string # Examples: "states>100 and states<10000" or "model_type=='DTMC'" # Model list is given as pairs of files and argument lists (see getModelsInDir() above) # Model data is read in from a file models.csv (or something else if specified # using options.modelsInfoFilename) def filterModels(modelFiles, filterString, dir): modelFilesNew = [] # Filter is false if the model metadata cannot be found if os.path.isfile(os.path.join(dir, options.modelsInfoFilename)): with open(os.path.join(dir, options.modelsInfoFilename), newline='') as csvfile: # Build a list of models matching the filter modelFilesFiltered = [] reader = csv.DictReader(csvfile) for row in reader: # Evaluate the filter using eval() # Only provide access to the model type and number of states for now #filterVarsAvailable = ['model_type', 'states'] #filterVarsMissing = [key for key in filterVarsAvailable if key in filterString and not row[key]] eval_model_type = row['model_type'] if row['model_type'] else None eval_states = int(row['states']) if row['states'] else None try: if eval(filterString, {"__builtins__": {}}, {'model_type': eval_model_type, 'states': eval_states}): model = row['model_file'] if 'model_consts' in row and row['model_consts']: model += ' -const ' + row['model_consts'] modelFilesFiltered.append(model) # If there are any errors evaluating the exception, the filter is false except (Exception): pass # Restrict models list to ones in the list of filter matches for modelFile in modelFiles: if modelMatches(modelFile, modelFilesFiltered): modelFilesNew.append(modelFile) return modelFilesNew; # Check if a model is contained in a list of possible matches # The model is given as a pairs of files and argument lists # (e.g. ("model.pm", ["-const", "N=2", "-const", "K=3"])) # The possible matches are a list of single strings # (e.g. "model.pm -const K=3,N=2") # Constant values are extracted and sorted for a more intelligent match def modelMatches(modelFile, possibleMatches): # Extract constant defns from modelFile and sort them consts = getSortedConsts(modelFile[1]) # If there are no constants, just check for an exact match in possibleMatches if len(consts) == 0: return modelFile[0] in possibleMatches # Otherwise compare to each model in possibleMatches after first sorting its constant defns for possibleMatch in possibleMatches: # Extract name (there will also be a " " since there are >0 constants) possibleMatchModel = possibleMatch[:possibleMatch.index(" ")] # Extract/sort constants from argument list possibleMatchConsts = getSortedConsts(("".join(re.findall("-const [^ ]+", possibleMatch))).split(" ")) # Compare if modelFile[0] == possibleMatchModel and consts == possibleMatchConsts: return True return False; # Extract and sort constant values form an argument list specified as a string list, # e.g. ["-const", "N=2", "-const", "K=3"] would return ["K=3", "N=2"] def getSortedConsts(args): consts = [] num_args = len(args) i = 0; while i < num_args: if args[i] == "-const" and i+1 < num_args: consts.extend(args[i+1].split(',')) i = i+1 consts.sort() return consts # Get a list of all files in the directory that satisfy the given predicate def getFilesInDir(dir, pred): resultFiles = [] for file in sortedListDir(dir): if os.path.isfile(os.path.join(dir, file)) and pred(file): resultFiles.append(os.path.join(dir, file)) return resultFiles # Return true iff the basename of file in the directory dir starts with prefix def startsWith(prefix, dir, file): return os.path.basename(os.path.join(dir, file)).startswith(os.path.basename(prefix)) # Get a list of models in a directory matching a (property|args|auto) file name. def getMatchingModelsInDir(dir, fileToMatch): return getFilesInDir(dir, lambda file: isPrismModelFile(file) and startsWith(file, dir, fileToMatch)) # Get a list of properties in a directory, by searching for all files with an appropriate extension. def getPropertiesInDir(dir): return getFilesInDir(dir, isPrismPropertiesFile) # Get a list of properties in a directory with prefix matching a model file name. def getMatchingPropertiesInDir(dir, modelFile): return getFilesInDir(dir, lambda file: isPrismPropertiesFile(file) and startsWith(modelFile, dir, file)) # Get a list of auto files in a directory def getAutoFilesInDir(dir): return getFilesInDir(dir, isAutoFile) # Get a list of auto files in a directory with prefix matching a model file name. def getMatchingAutoFilesInDir(dir, modelFile): return getFilesInDir(dir, lambda file: isAutoFile(file) and startsWith(modelFile, dir, file)) # Get a list of all out files in a directory def getOutFilesInDir(dir): return getFilesInDir(dir, isOutFile) # Get a list of all out files in a directory whose prefix matches a model file name def getMatchingOutFilesInDir(dir, modelFile): return getFilesInDir(dir, lambda file: isOutFile(file) and startsWith(modelFile, dir, file)) # Extract all command-line switches from an "args" file into a list # Just combine switches on all (non-commented) lines together, delimited by spaces # Returns an empty list if the file does not exist def getAllArgsFromFile(file): args = [] if not os.path.isfile(file): return args for line in open(file, 'r').readlines(): line = line.strip() if len(line) == 0 or lineIsCommentedOut(line): continue items = line.split(' ') for item in items: if len(item) > 0: args.append(item) return args # Extract command-line switches from an "args" file into a list of lists # Switches from each (non-commented) line, delimited by spaces, are in a separate list # Returns an empty list if the file does not exist def getArgsListsFromFile(file): argsSet = [] if not os.path.isfile(file): return argsSet for line in open(file, 'r').readlines(): args = [] line = line.strip() if len(line) == 0 or lineIsCommentedOut(line): continue items = line.split(' ') for item in items: if len(item) > 0: args.append(item) if len(args) > 0: argsSet.append(args) return argsSet # Read the matching .args file for the given model/properties/auto file and return a list of lists, # each list corresponding to one line in the .args file, one argument per list item # # * file: name of the model/properties file (as a string) def getMatchingArgListsForFile(file): if os.path.isfile(file + ".args"): return getArgsListsFromFile(file + ".args") return [[]] # Add any extra args provided to this script to each of the given argument lists def addExtraArgs(argLists): if len(options.extraArgs) > 0: result = argLists for x in options.extraArgs: result = result + x.split(' ') return result else: return argLists # Returns true iff there is a possible name clash for the given filename def possibleNameClash(fullName): withoutExt = fullName.rsplit('.', 1)[0] exts = ['lab','tra','stat','srew','trew'] return any(map (os.path.exists, [fullName] + [withoutExt + '.' + ext for ext in exts] + [withoutExt + '1.' + ext for ext in exts])) # Join directory and filename to obtain a full path # If doAddPrefix is true, a prefix is prepended to the filename as well def expandName(dir, option): splitOption = option.split(':') # if the filename is 'stdout' (recognized as special by PRISM), # we don't expand and simply return the input if splitOption[0] == "stdout": return option fullName = os.path.join(dir, splitOption[0]) return fullName + (":" + splitOption[1] if len(splitOption) > 1 else '') # Prepend the given prefix to the given filename or filename:option string # e.g. prependToFile('hello.', '/some/world:here) == '/some/hello.world:here' def prependToFile(prefix, option): splitOption = option.split(':') # if the filename is 'stdout' (recognized as special by PRISM), # we don't expand and simply return the input if splitOption[0] == "stdout": return option fullName = os.path.join(os.path.dirname(splitOption[0]), 'tmp.' + os.path.basename(splitOption[0])) return fullName + (":" + splitOption[1] if len(splitOption) > 1 else '') # Traverses an argument list, expanding all filenames in import and export switches # and appending a prefix to each export filename to prevent PRISM from overriding the out file def expandFilenames(args, dir=""): def isImportExportArg(arg): return (arg.startswith("-export") or arg.startswith("-import")) and not arg=="-exportmodelprecision" if args: return [args[0]] + [expandName(dir, args[i+1]) if isImportExportArg(args[i]) else args[i+1] for i in range(len(args)-1)] else: return [] # Rename all export files in the arguments by prepending prefix def renameExports(prefix, args): def isExportArg(arg): return arg.startswith("-export") and not arg=="-exportmodelprecision" if args: return [args[0]] + [prependToFile(prefix, args[i+1]) if isExportArg(args[i]) else args[i+1] for i in range(len(args)-1)] else: return args # Return True if there are any -export... switches def hasExportSwitches(args): for i in range(len(args)-1): if args[i].startswith("-export"): return True return False # Find all files that match any -export switch file argument # This takes into account that a .all extension corresponds to five different files # and that multiple reward structures will result in filenames extended with a number def getExpectedOutFilesFromArgs(args): options = [args[i+1] for i in range(len(args)-1) if args[i].startswith("-export") and not args[i]=="-exportmodelprecision"] # Sometimes there are options appended, after a ":" - remove these files = map(lambda option: option.split(':')[0], options) resultFiles = [] for file in files: # Sometimes we have extra extensions appended, e.g. -exportmodel out.sta,tra split = file.split(','); if (len(split) > 1): moreExts = split[1:len(split)] else: moreExts = [] # Get extension split = split[0].rsplit('.', 1) base = split[0] if (len(split) == 1): split = split + [""] # Determine relevant extensions if split[1] == 'all': exts = ['lab','tra','sta','srew','trew'] else: exts = [split[1]] if (moreExts): exts = exts + moreExts; # Find all files of the form base. for ext in exts: fullName = base + '.' + ext foundFile = False if os.path.exists(fullName): resultFiles.append(fullName) foundFile = True else: i = 1 fullName = base + str(i) + '.' + ext while os.path.exists(fullName): resultFiles.append(fullName) i += 1 fullName = base + str(i) + '.' + ext foundFile = True if not foundFile: printColoured('WARNING', "Warning: There is no file of the form " + base + "[number]." + ext + " to compare against -- will skip") incrementTestStat('WARNING') return resultFiles # Create a valid name for a log file based on a list of benchmark arguments def createLogFileName(args, dir=""): logFile = '.'.join(args) if len(dir) > 0: logFile = re.sub(dir+'/', '', logFile) logFile = re.sub('/', '_', logFile) logFile = re.sub('[^a-zA-Z0-9=_, \.]', '', logFile) logFile = re.sub('[ ]+', '.', logFile) logFile = re.sub('[\.]+', '.', logFile) logFile = re.sub('^[\._]+', '', logFile) return logFile + ".log" # Create a valid name for a log (sub) directory based on a list of benchmark arguments def createLogDirName(args): logDir = '.'.join(args) logDir = re.sub('/', '_', logDir) logDir = re.sub('[^a-zA-Z0-9=_, \.]', '', logDir) logDir = re.sub('[ ]+', '.', logDir) logDir = re.sub('[\.]+', '.', logDir) logDir = re.sub('^[\._]+', '', logDir) return logDir # Walk a directory and execute a callback on each file def walk(dir, meth): dir = os.path.abspath(dir) for file in [file for file in sortedListDir(dir) if not file in [".","..",".svn"]]: nfile = os.path.join(dir, file) meth(nfile) if os.path.isdir(nfile): walk(nfile,meth) # # Print a message in colour, but only if isColourEnabled() is true. # If isColourEnabled is false, print message without colours. # The colour parameter is a key for the global testColours dictionary. # def printColoured(colour, msg): global testColours if isColourEnabled(): print('\033[' + str(testColours[colour]) + 'm' + msg + '\033[0m') else: print(msg) sys.stdout.flush() # # Given a switch (-xyz or --xyz), expand to full-length form # using the switchShortcuts mapping. # Switches can also have the form -xyz:foo=..., i.e., carrying # additional arguments; this is handled as well. # Return expanded switch or the original parameter, if not expanded. # def expandShortcutSwitch(s): xtradash = '' if (s.startswith('--')): xtradash = '-' s = s[1:] # split off extra arguments after a : l = s.split(':',1) # continue with the actual switch name s = l[0] # get entry for s; otherwise, default value is s itself s = switchShortcuts.get(s,s) # add back extra dash, if necessary s = xtradash + s # add back :arg part, if necessary if len(l) == 2: s += ':' + l[1] return s # # Given an argument list, expand short-form switches (-xyz or --xyz) # to their full-length form using the switchShortcuts mapping. # Returns processed argument list. # def expandShortcutsInArgs(args): return [expandShortcutSwitch(x) for x in args] # # Given an argument list, remove conflicting settings, # e.g., if there are multiple switches for selecting # the PRISM engine, only keep the last one, i.e., # the switch that PRISM will actually use. # Returns processed argument list. # def removeConflictingArgs(args, conflicting): seen = False result = [] for x in reversed(args): if x in conflicting: if not seen: result.append(x) seen = True else: result.append(x) result.reverse() return list(result) # # Given an argument list, returns a (more or less) # canonical version: # * Performs regularisation of --switch to -switch # * Expands short-form switches to long-form version # (if we know about them, see switchShortcuts mapping) # * Removes conflicting switches for # - PRISM engine, as well as -exact and -param setting # - model type override # def canonicaliseArgs(args): # canonicalise --switch to -switch args = [ x[1:] if x.startswith('--') else x for x in args] # expand the short cuts via switchShortcuts mapping args = expandShortcutsInArgs(args) # remove conflicting engine setting engines = frozenset(['-explicit', '-hybrid', '-sparse', '-mtbdd']) args = removeConflictingArgs(args, engines) # remove conflicting model type overrides args = removeConflictingArgs(args, frozenset(['-mdp', '-dtmc', '-ctmc'])) # handle exact / param switches if '-exact' in args or '-param' in args: # remove engines if we are in exact or param mode (normal engines are irrelevant) args = [x for x in args if x not in engines ] if '-param' in args: # remove -exact if we are in param mode (param takes precedence) args = [x for x in args if x != '-exact' ] return args #================================================================================================== # Benchmarking #================================================================================================== # Run PRISM with a given list of command-line args # # * args: list of command-line line arguments # * bmArgs: further list of command-line line arguments (benchmarking ones) # * dir: name of the directory (as a string) def runPrism(args, bmArgs, dir=""): # collate all arguments args += bmArgs # check if we can skip if options.skipDuplicates: canonicalArgs = canonicaliseArgs(args) if (tuple(canonicalArgs) in alreadyRan): incrementTestStat('SKIPPED') incrementTestStat('SKIPPED_DUPLICATE') return alreadyRan.add(tuple(canonicalArgs)) # keep track if we need to cleanup the log file after use cleanupLogFile = False if options.test: if options.testAll: args.append("-testall") else: args.append("-test") if options.nailgun: prismArgs = [options.ngprism] + args else: prismArgs = [options.prismExec] + args if options.echo or options.echoFull: if options.echoFull: prismArgs = ['echo', quote(' '.join(prismArgs)), ';'] + prismArgs if options.logDir: logDir = options.logDir if options.logSubdirs and bmArgs: logDir = os.path.join(logDir, createLogDirName(bmArgs)) logFile = os.path.relpath(os.path.join(logDir, createLogFileName(args, dir))) logFile = quote(logFile) if options.test: prismArgs += ['|', 'tee', logFile] else: prismArgs += ['>', logFile] if options.test: prismArgs += ['|', 'grep "Testing result:"'] print(' '.join(prismArgs)) return print(' '.join(prismArgs)) sys.stdout.flush() if options.logDir: logDir = options.logDir if options.logSubdirs and bmArgs: logDir = os.path.join(logDir, createLogDirName(bmArgs)) if not os.path.exists(logDir): os.makedirs(logDir) logFile = os.path.join(logDir, createLogFileName(args, dir)) #f = open(logFile, 'w') prismArgsExec = prismArgs + ['-mainlog', logFile] exitCode = execute(prismArgsExec) #exitCode = subprocess.Popen(prismArgsExec, cwd=dir, stdout=f).wait() elif options.test or options.ddWarnings: # create logfile if isWindows(): # On Windows, PRISM can not open a temporary file created with NamedTemporaryFile. # So we use the not-as-secure mktemp... # We use . as directory because PRISM / Java can not handle # cygwin style paths (/tmp/...) logFile = tempfile.mktemp(dir='.') else: # Not on Windows, use NamedTemporaryFile f = tempfile.NamedTemporaryFile(delete=False) logFile = f.name f.close() # we want cleanup when we are done, as this is a real temporary file cleanupLogFile = True prismArgsExec = prismArgs + ['-mainlog', logFile] exitCode = execute(prismArgsExec) else: exitCode = execute(prismArgs) # Extract DD reference count warnings if options.ddWarnings: for line in open(logFile, 'r').readlines(): if re.match('Warning: CUDD reports .* non-zero references', line): printTestResult(line) # Extract test results if needed if options.test: for line in open(logFile, 'r').readlines(): if re.match('Testing result:', line): printTestResult(line) elif re.match('Warning:', line): if options.showWarnings or options.verboseTest: printTestResult(line) else: # We don't print it, but we count it countTestResult(line) elif options.verboseTest: # in verbose mode, also print the non-matching lines # rstrip to remove newline before printing if re.match('Error:', line): # highlight lines with error messages printColoured('FAILURE', line.rstrip()) else: print(line.rstrip()) sys.stdout.flush() if options.test and exitCode != 0: # failure if not options.verboseTest: # we don't want to cleanup the log (if it was a temporary file) # so that the user can inspect it cleanupLogFile = False for line in open(logFile, 'r').readlines(): if re.match('Error:', line): printTestResult(line) if (options.printFailures): print("Failing test log file:") for line in open(logFile, 'r').readlines(): print(line, end="") else: print("To see log file, run:") print("edit " + logFile) prismArgsPrint = list(prismArgs) prismArgsPrint[0] = options.prismExec print("To re-run failing test:") print(' '.join(prismArgsPrint)) else: # in verbose test mode, print extra info and increment failure counter printColoured('FAILURE', "\n[Exit code: " + str(exitCode) + "]") incrementTestStat('FAILURE') if not options.testAll: closeDown(1) if cleanupLogFile: # logFile was a temporary file and we'd like to clean it up os.remove(logFile) # call-back function for expiration of the execute timer thread def timeout(proc, flag): printColoured('FAILURE', '\nTimeout (' + str(options.timeout) + 's)') flag.append(True) proc.kill() # execute the given process, optionally setting a timeout def execute(args): if options.timeout is not None: # run program proc = subprocess.Popen(args) # list to serve as indicator that a timeout has occurred flag = [] # setup timer to call timeout function with proc and flag as arguments timer = Timer(float(options.timeout), timeout, [proc, flag]) try: # start time and wait for process to finish timer.start() exitCode = proc.wait() # here, either the process quit by itself (flag is empty) # or was killed in the timeout function (flag contains one True element) hadTimeout = bool(flag) # timeout = flag is not empty -> True if hadTimeout: incrementTestStat('TIMEOUT') # in nailgun mode, we have to restart the nailgun server, # as the VM can be in an inconsistent state if options.nailgun: restartNailGunServer() # if we had a timeout, fake exitCode of 0 return 0 return exitCode finally: timer.cancel() else: proc = subprocess.Popen(args) exitCode = proc.wait() return exitCode # Print a testing-related message, colour coding if needed def printTestResult(msg): global testColours msg = str.rstrip(msg) countTestResult(msg) if not isColourEnabled(): print(msg) sys.stdout.flush() return # Coloured-coded... if 'Error:' in msg or 'FAIL' in msg: printColoured('FAILURE', msg) elif 'Warning:' in msg: printColoured('WARNING', msg) elif 'PASS' in msg: printColoured('SUCCESS', msg) elif 'SKIPPED' in msg or 'NOT TESTED' in msg: printColoured('SKIPPED', msg) elif 'UNSUPPORTED' in msg or 'Warning:' in msg: printColoured('UNSUPPORTED', msg) else: print(msg) sys.stdout.flush() def incrementTestStat(stat): global testStats testStats[stat]+=1 def printTestStatistics(): if options.test and not options.echo: print('\nTest results:') printColoured('SUCCESS', ' Success: ' + str(testStats['SUCCESS'])) printColoured('WARNING', ' Warnings: ' + str(testStats['WARNING']) + (' (use -w to show)' if (testStats['WARNING']>0 and not options.showWarnings and not options.verboseTest) else '')) if (options.ddWarnings): printColoured('WARNING', ' DD-Warnings: ' + str(testStats['DDWARNING'])) printColoured('FAILURE', ' Failure: ' + str(testStats['FAILURE'])) if testStats['FAIL_NONCONVERGE']: printColoured('FAILURE', ' - Non-convergence: ' + str(testStats['FAIL_NONCONVERGE'])) printColoured('UNSUPPORTED', ' Unsupported: ' + str(testStats['UNSUPPORTED'])) printColoured('SKIPPED', ' Skipped: ' + str(testStats['SKIPPED'])) if testStats['SKIPPED_NOT_TESTED']: printColoured('SKIPPED', ' - Not tested: ' + str(testStats['SKIPPED_NOT_TESTED'])) if options.skipDuplicates: printColoured('SKIPPED', ' - Duplicates: ' + str(testStats['SKIPPED_DUPLICATE']) + ' (due to --skip-duplicate-runs)') if options.skipExportRuns: printColoured('SKIPPED', ' - Export: ' + str(testStats['SKIPPED_EXPORT']) + ' (due to --skip-export-runs)') if options.timeout is not None: printColoured('FAILURE', ' Timeouts: ' + str(testStats['TIMEOUT'])) def countTestResult(msg): if 'Error:' in msg or 'FAIL' in msg: if 'did not converge' in msg: incrementTestStat('FAIL_NONCONVERGE') incrementTestStat('FAILURE') elif options.ddWarnings and re.match('Warning: CUDD reports .* non-zero references', msg): incrementTestStat('WARNING') incrementTestStat('DDWARNING') elif 'Warning:' in msg: incrementTestStat('WARNING') elif 'PASS' in msg: incrementTestStat('SUCCESS') elif 'SKIPPED' in msg or 'NOT TESTED' in msg: if 'NOT TESTED' in msg: incrementTestStat('SKIPPED_NOT_TESTED') incrementTestStat('SKIPPED') elif 'UNSUPPORTED' in msg: incrementTestStat('UNSUPPORTED') # Is printing of colour coded messages enabled? def isColourEnabled(): if options.colourEnabled == "yes": return True elif options.colourEnabled == "no": return False else: # auto: yes if in terminal mode return sys.stdout.isatty() # Checks for each file from the outFiles list whether there is an identical file # with the name exportPrefix + file. If so, said file is deleted. Otherwise, it is kept # Returns true iff identical files were found for each out file def verifyAndCleanupExports(outFiles, exportPrefix): result = True # Check for equality with out files for outFile in outFiles: msg = "Testing export " + os.path.basename(outFile) + ": " expFile = prependToFile(exportPrefix, outFile) if os.path.isfile(expFile): if options.noExportTests: msg = msg + "SKIPPED" os.remove(expFile) elif compareFiles(outFile, expFile): # If successful, notify and delete exported file msg = msg + "PASS" os.remove(expFile) else: msg = msg + "FAIL (" + os.path.basename(expFile) + " does not match)" print("To see difference, run:") print("diff " + outFile + " " + expFile) sys.stdout.flush() result = False else: if options.noExportTests: msg = msg + "SKIPPED" else: msg = msg + "FAIL (no " + os.path.basename(expFile) + " to compare to)" result = False printTestResult(msg) if not result: return result return result # Run a benchmark, specified by a list of command-line args, # possibly iterating over further lists of args from a "bm" file def benchmark(file, args, dir=""): logging.debug("Benchmarking: " + file + ", " + str(args)) # Add extra arguments from command line, if applicable args = addExtraArgs(args) # Expand input/output files to full paths args = expandFilenames(args, dir) if options.skipExportRuns and hasExportSwitches(args): incrementTestStat('SKIPPED') incrementTestStat('SKIPPED_EXPORT') return # Determine which out files apply to this benchmark from the -export switches (if required) if not options.echo and options.test: outFiles = getExpectedOutFilesFromArgs(args) # Rename export files to avoid overriding out files # (if in test mode, and if not disabled) exportPrefix = 'tmp.' if (options.test and not options.noRenaming): args = renameExports(exportPrefix, args) # print '\033[94m' + "EXECUTING BENCHMARK" + '\033[0m' # print "File: " + file # print "Directory: " + dir # print "Args: " + ' '.join(args) # print " " modelFileArg = [file] if (file != "") else [] # Loop through benchmark options, if required argsLists = [] # May be specified in a file if options.bmFile: if not os.path.isfile(os.path.join(options.bmFile)): print("Cannot read arguments from non-existing file: " + os.path.join(options.bmFile)) sys.exit(1) argsLists.extend(getArgsListsFromFile(options.bmFile)) # And/or may be specified on the command line if options.bmList: for bmArgsString in options.bmList.split(','): argsLists.append(bmArgsString.strip().split(' ')) # Now loop through benchmark options if len(argsLists) > 0: for bmArgs in argsLists: runPrism(modelFileArg + args, bmArgs, dir) # If none, just use existing args alone else: runPrism(modelFileArg + args, [], dir) # Verify that exported files are correct (if required) if not options.echo and options.test and outFiles: # print "Out files to verify exports against: " + ' '.join(outFiles) allEqual = verifyAndCleanupExports(outFiles, exportPrefix) if (not allEqual) and (not options.testAll): closeDown(1) # Execute benchmarking based on a directory # Unless requested not to (via -n/--non-recursive), the directory is searched recursively. # In each directory, all models are found - either those listed in a file called 'models', # if present, or all files with a suitable extension within the directory. # Each model is then treated as if it had been called with prism-auto directly # In addition, any "orphan" auto files are run (i.e. those not matching some model file). # This basically means calling PRISM for each line of the auto file, and passing the # contents of this line as the arguments. Arguments found in a matching .args file # (e.g. xxx.auto.args) are also appended, and if there are multiple lines in the .args file, # PRISM is run for each line of the auto file and each line of the .args file. # # * dir: name of the directory (as a string) def benchmarkDir(dir): logging.debug("Benchmarking dir " + dir) # Recurse first, unless asked not to if not options.nonRec: for file in [file for file in sortedListDir(dir) if not file in [".","..",".svn"]]: if os.path.isdir(os.path.join(dir, file)): benchmarkDir(os.path.join(dir, file)) # Get model files in dir modelFiles = getModelsInDir(dir) for modelFile in modelFiles: benchmarkModelFile(modelFile[0], modelFile[1], dir) # Get "orphan" auto files autoFiles = filter(functools.partial(isOrphan, dir), getAutoFilesInDir(dir)) for autoFile in autoFiles: logging.debug("Orphan auto file: " + autoFile) for args in getArgsListsFromFile(autoFile): benchmark("", args, dir) # Execute benchmarking based on a single file (model, property, list, auto) # # * file: name of the file (as a string) def benchmarkFile(file): if isPrismModelFile(file): benchmarkModelFile(file) elif isPrismPropertiesFile(file): benchmarkPropertiesFile(file) elif isPrismPropListFile(file): benchmarkPropListFile(file) elif isAutoFile(file): benchmarkAutoFile(file) # Execute benchmarking based on a single model file, possibly with some additional # arguments to pass to PRISM, passed in the list modelArgs (probably from a "models" file). # If there is a matching .args file (e.g. model.nm.args), arguments in this file # are also appended when calling PRISM (and multiple lines result in multiple PRISM runs). # # * modelFile: name of the model file (as a string) # * modelArgs: (optionally) a list of arguments attached to the model, e.g. ["-const", "N=2"] # * dir: (optionally) the directory containing the model (if absent, it is deduced) def benchmarkModelFile(modelFile, modelArgs=[], dir=""): logging.debug("Benchmarking model file " + modelFile + " " + str(modelArgs)) if dir == "": dir = os.path.dirname(modelFile) if dir == "": dir = "." # Expand model file based on any .args file argLists = getMatchingArgListsForFile(modelFile) logging.debug("Arg lists: " + str(argLists)) for args in argLists: # Build mode: just build if options.build: benchmark(modelFile, modelArgs + args, dir) # Otherwise, find properties else: # Find and benchmark properties if options.matching: propertiesFiles = getMatchingPropertiesInDir(dir, modelFile) else: propertiesFiles = getPropertiesInDir(dir) logging.debug("Properties files: " + str(propertiesFiles)) for propertiesFile in propertiesFiles: logging.debug("Property file: " + propertiesFile) for argsp in getMatchingArgListsForFile(propertiesFile): benchmark(modelFile, modelArgs + args + [propertiesFile] + argsp, dir) # Find and benchmark auto files autoFiles = getMatchingAutoFilesInDir(dir, modelFile) logging.debug("Auto files: " + str(autoFiles)) for autoFile in autoFiles: logging.debug("Auto file: " + str(autoFile)) for autoArgs in getArgsListsFromFile(autoFile): for argsa in getMatchingArgListsForFile(autoFile): benchmark(modelFile, modelArgs + args + autoArgs + argsa, dir) # Execute benchmarking on an auto file, i.e. a file containing one or more lines # of command-line arguments specifying calls to be made to PRISM. # If in "matching mode, and if it is present, an associated model file (with matching name) # is also used. But there is no corresponding property file. # # * autoFile: name of the auto file (as a string) def benchmarkAutoFile(autoFile): logging.debug("Benchmarking auto file " + autoFile) dir = os.path.dirname(autoFile) if dir == "": dir = "." if options.matching: matchingModelFiles = getMatchingModelsInDir(dir, autoFile) modelFiles = map(lambda file: [file,[]], matchingModelFiles) else: modelFiles = getModelsInDir(dir) logging.debug("Model files: " + str(modelFiles)) for modelFile in modelFiles: # Read args for the model for modelArgs in getMatchingArgListsForFile(modelFile): # Treat auto file like an args file for argsList in getArgsListsFromFile(autoFile): # Don't look for properties (corresponds to build mode) for argsa in getMatchingArgListsForFile(autoFile): benchmark(modelFile, modelArgs + argsList + argsa, dir) if not modelFiles: # There aren't any (matching) model files, process as "orphaned" auto file for argsList in getArgsListsFromFile(autoFile): benchmark("", argsList, dir) # Execute benchmarking based on a single properties file. # # * propertiesFile: name of the properties file (as a string) def benchmarkPropertiesFile(propertiesFile): logging.debug("Benchmarking properties file " + propertiesFile) dir = os.path.dirname(propertiesFile) if dir == "": dir = "." # Expand properties file based on any .args file argLists = getMatchingArgListsForFile(propertiesFile) for args in argLists: # Find models if options.matching: matchingModelFiles = getMatchingModelsInDir(dir, propertiesFile) modelFiles = map(lambda file: [file,[]], matchingModelFiles) else: modelFiles = getModelsInDir(dir) logging.debug("Model files: " + str(modelFiles)) for modelFile in modelFiles: # Expand model based on any .args file, too for modelArgs in getMatchingArgListsForFile(modelFile[0]): benchmark(modelFile[0], modelFile[1] + modelArgs + [propertiesFile] + args, dir) # Execute benchmarking based on a property list. # A property list is a file containing pairs of the form , where: # is a directory, relative to the location of the properties file, and # is the name of a properties file contained within that directory. # Each properties file is treated as if it had been called with prism-auto directly. # # * propListFile: name of the property list file (as a string) def benchmarkPropListFile(propListFile): logging.debug("Benchmarking property list file " + propListFile) listDir = os.path.dirname(propListFile) if listDir == "": listDir = "." for line in open(propListFile, 'r').readlines(): line = line.strip() if len(line) == 0 or lineIsCommentedOut(line): continue items = line.split(',') dir = os.path.join(listDir, items[0].strip()) dir = os.path.realpath(dir) propFile = items[1].strip() benchmarkPropertiesFile(os.path.join(dir, propFile)) # (Re-)start the nailgun server def restartNailGunServer(): # First try to stop an existing server (which should fail quickly if there is none running) print("Stopping existing nailgun server, if it's running...") sys.stdout.flush() subprocess.Popen([options.ngprism, "stop"]).wait() # For Java version >=19, running Nailgun needs a workaround javaVersion = getJavaVersion() if javaVersion is not None and javaVersion >= 19: os.environ['PRISM_JAVA_PARAMS'] = '-Djava.security.manager=allow' # Start the server and make sure it is running print("Starting nailgun server...") sys.stdout.flush() os.system(options.prismExec + " -ng &") print("Checking nailgun server is running...") exitCode = 1 tries = 0 maxTries = 5; while exitCode != 0 and tries < maxTries: time.sleep(0.5) tries = tries + 1 exitCode = subprocess.Popen([options.ngprism, "-version"]).wait() # Get the (major) version of Java used to run PRISM, return as an integer (or None if failed) def getJavaVersion(): try: # Get the output of "prism -javaversion", extract version as regexp result = subprocess.run([options.prismExec, '-javaversion'], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True) javaVersionMajor = None if result.returncode == 0: javaVersionString = re.search(r'"(\d+)', result.stderr) if javaVersionString: javaVersionMajor = int(javaVersionString.group(1)) return javaVersionMajor except FileNotFoundError: # prismExec call fails return None #================================================================================================== # Main program #================================================================================================== def printUsage(): print("Usage: prism-auto ...") def signal_handler(signal, frame): closeDown(1) # Close down neatly, stopping nailgun if needed def closeDown(exitCode): if options.nailgun: if options.echo or options.echoFull: print(options.ngprism + " stop") else: subprocess.Popen([options.ngprism, "stop"]).wait() printTestStatistics() sys.exit(exitCode) # Main program signal.signal(signal.SIGINT, signal_handler) parser = OptionParser(usage="usage: %prog [options] args") parser.add_option("-l", "--log", dest="logDir", metavar="DIR", default="", help="Store PRISM output in logs in DIR") parser.add_option("-a", "--args", dest="bmFile", metavar="FILE", default="", help="Read argument lists for benchmarking from FILE") parser.add_option("--args-list", dest="bmList", metavar="X", default="", help="Use X as argument lists for benchmarking (comma-separated)") parser.add_option("-e", "--echo", action="store_true", dest="echo", default=False, help="Just print out tasks, don't execute") parser.add_option("-m", "--matching", action="store_true", dest="matching", default=False, help="Only use matching models/properties, not all files") parser.add_option("-b", "--build", action="store_true", dest="build", default=False, help="Just build models, don't model check properties") parser.add_option("-p", "--prog", dest="prismExec", metavar="FILE", default="prism", help="Program to execute [default=prism]") parser.add_option("-n", "--non-recursive", action="store_true", dest="nonRec", default=False, help="Don't recurse into directories") parser.add_option("-x", "--extra", action="append", dest="extraArgs", metavar="XXX", default=[], help="Pass extra switches to PRISM") parser.add_option("-t", "--test", action="store_true", dest="test", default=False, help="Run in test mode") parser.add_option("--verbose-test", action="store_true", dest="verboseTest", default=False, help="Run in verbose test mode (implies --test)") parser.add_option("-w", "--show-warnings", action="store_true", dest="showWarnings", default=False, help="Show warnings (as well as errors) when in test mode") parser.add_option("--nailgun", action="store_true", dest="nailgun", default=False, help="Run PRISM in Nailgun mode") parser.add_option("--ngprism", dest="ngprism", metavar="FILE", default=None, help="Specify the location of ngprism (for Nailgun mode) [default: derive from --prog setting]") parser.add_option("--test-all", action="store_true", dest="testAll", default=False, help="In test mode, don't stop after an error") parser.add_option("--no-renaming", action="store_true", dest="noRenaming", default=False, help="Don't rename files to be exported") parser.add_option("--debug", action="store_true", dest="debug", default=False, help="Enable debug mode: display debugging info") parser.add_option("--echo-full", action="store_true", dest="echoFull", default=False, help="An expanded version of -e/--echo") parser.add_option("--models-filename", dest="modelsFilename", metavar="X", default="models", help="Read in list of models/parameters for a directory from file X, if present [default=models]") parser.add_option("--models-info", dest="modelsInfoFilename", metavar="X", default="models.csv", help="Read model details from CSV file X, if present [default=models.csv]") parser.add_option("--filter-models", dest="filterModels", metavar="N", default="", help="Filter benchmark models...") parser.add_option("--log-subdirs", action="store_true", dest="logSubdirs" ,default=False, help="Organise PRISM output logs in subdirectories per benchmark argument") parser.add_option("--no-export-tests", action="store_true", dest="noExportTests", default=False, help="Don't check exported files when in test mode") parser.add_option("--skip-export-runs", action="store_true", dest="skipExportRuns", default=False, help="Skip all runs having exports") parser.add_option("--skip-duplicate-runs", action="store_true", dest="skipDuplicates", default=False, help="Skip PRISM runs which have the same arguments as an earlier run (with some heuristics to detect equivalent arguments)") parser.add_option("--timeout", dest="timeout", metavar="N", default=None, help='Timeout for each PRISM run (examples values for N: 5 / 5s for seconds, 5m / 5h for minutes / hours)') parser.add_option("--dd-warnings", action="store_true", dest="ddWarnings", default=False, help="Print the DD reference count warnings") parser.add_option("--colour", dest="colourEnabled", metavar="X", type="choice", choices=["yes","no","auto"], default="auto", help="Whether to colour test results: yes, no, auto (yes iff in terminal mode) [default=auto]") parser.add_option("--print-failures", action="store_true", dest="printFailures", default=False, help="Display logs for failing tests") (options, args) = parser.parse_args() if len(args) < 1: parser.print_help() sys.exit(1) if options.debug: logging.basicConfig(level=logging.DEBUG) if options.verboseTest: # --verbose-test implies --test vars(options)['test'] = True if options.timeout: # handle time units mult = 1.0 if options.timeout.endswith('s'): vars(options)['timeout'] = options.timeout[0:-1] # strip 's' elif options.timeout.endswith('m'): vars(options)['timeout'] = options.timeout[0:-1] mult = 60.0; elif options.timeout.endswith('h'): vars(options)['timeout'] = options.timeout[0:-1] mult = 3600.0; try: vars(options)['timeout'] = float(options.timeout) * mult except ValueError: print('Illegal parameter value for timeout parameter') sys.exit(1) # if options.logDir and not os.path.isdir(options.logDir): # print("Log directory \"" + options.logDir + "\" does not exist") # sys.exit(1) if options.nailgun: if options.ngprism is None: # derive ngprism location from the --prog setting dir = os.path.dirname(options.prismExec) base = os.path.basename(options.prismExec) # we just replace the base part of prismExec by ngprism and hope for the best... base = 'ngprism' options.ngprism = os.path.join(dir, base) print("Derived executable for nailgun server: " + options.ngprism) if options.echo or options.echoFull: print(options.prismExec + " -ng &") else: restartNailGunServer(); # process benchmarks for arg in args: if os.path.isdir(arg): benchmarkDir(arg) elif os.path.isfile(arg): benchmarkFile(arg) else: print("Error: File/directory " + arg + " does not exist") # shutdown if options.test and options.testAll and not options.echo: if testStats['FAILURE'] > 0: # report via the exit code that there were actual failures in test + test-all mode closeDown(1) closeDown(0)