import logging
import os
logger = logging.getLogger('emop')
[docs]class EmopScheduler(object):
#: Define attributes of supported schedulers.
#: To add a new scheduler this dict must be updated.
supported_schedulers = {
"slurm": {"module": "emop.lib.schedulers.emop_slurm", "class": "EmopSLURM"},
}
#: The name of the scheduler must be defined in a child class.
name = ""
#: The support JOB ID environment variables must be defined in a child class.
jobid_env_vars = []
def __init__(self, settings):
""" Initialize EmopScheduler object and attributes
Attributes:
settings (EmopSettings): Instance of EmopSettings
name (str): Name of the scheduler, defined in a child class.
job_id (str or int): Current job ID and the value is determined
by environment variable list defined in a child class.
Args:
settings (object): EmopSettings instance
"""
self.settings = settings
self.name = self.get_name()
self.job_id = self.get_job_id()
@classmethod
[docs] def get_scheduler_instance(cls, name, settings):
"""Get the scheduler instance
Based on the value of the name argument, an instance
of that scheduler class is returned.
The logic in the function is dynamic so that only the
supported_schedulers dict in EmopScheduler needs to be
updated to add additional scheduler support.
Args:
name (str): Name of scheduler to get instance of.
settings (EmopSettings): Instance of EmopSettings.
Returns:
object: Instance of an EmopScheduler sub-class.
"""
name = name.lower()
supported_schedulers = cls.supported_schedulers
if name in supported_schedulers:
dict_ = supported_schedulers.get(name)
module_ = __import__(dict_["module"], fromlist=[dict_["class"]])
class_ = getattr(module_, dict_["class"])
return class_(settings=settings)
else:
logger.error("Unsupported scheduler %s" % name)
raise NotImplementedError
[docs] def current_job_count(self):
raise NotImplementedError
[docs] def submit_job(self, proc_id, num_pages):
raise NotImplementedError
[docs] def is_job_environment(self):
"""Test if currently in a valid scheduler job environment.
The class attribute `jobid_env_vars` contains a list
of the valid job ID environment variables for a scheduler.
The current environment is tested to see if it contains a valid job ID
environment variable.
Returns:
bool: True if the current environment contains an environment
variable from the class attribute `jobid_env_vars` list.
False is returned if none are found.
"""
job_env = False
for jobid_env_var in self.__class__.jobid_env_vars:
if os.environ.get(jobid_env_var):
logger.debug("Found scheduler job ID environment variable %s" % jobid_env_var)
job_env = True
return job_env
[docs] def get_name(self):
"""Get the scheduler's name
The return value is pulled from the class `name` attribute.
Returns:
str: The scheduler's name
"""
name = self.__class__.name
if not name:
raise NotImplementedError
return name
[docs] def get_job_id(self):
"""Get the scheduler's job ID
Loops over the class' `jobid_env_vars` attribute to find the
current job ID.
Returns:
int: The job ID of the current scheduler job.
"""
jobid_env_vars = self.__class__.jobid_env_vars
if not jobid_env_vars:
raise NotImplementedError
for jobid_env_var in jobid_env_vars:
jobid = os.environ.get(jobid_env_var)
if jobid:
return jobid
[docs] def walltime(self, num_pages):
"""Determine walltime used for submitting job
This function determines the appropriate walltime to use
when submitting a job.
The optimal walltime is determined by using
avg_page_runtime * N * num_pages, where N is either
400%, 200% or 150%. The first optimal walltime to be less
than the max_job_runtime is used.
Args:
num_pages (int): Number of pages to be run
Returns:
int: A walltime value in seconds.
"""
avg_page_runtime = int(self.settings.avg_page_runtime)
num_pages = int(num_pages)
max_runtime = self.settings.max_job_runtime
walltimes = []
walltime = max_runtime
# 400% the average
walltimes.append((avg_page_runtime * 4 * num_pages))
# 200% the average
walltimes.append((avg_page_runtime * 2 * num_pages))
# 150% the average
walltimes.append((avg_page_runtime * 1.5 * num_pages))
for w in walltimes:
if w <= max_runtime:
walltime = w
break
return int(walltime)