Source code for emop.emop_submit

import json
import logging
from emop.lib.emop_base import EmopBase
from emop.lib.emop_payload import EmopPayload
from emop.lib.emop_scheduler import EmopScheduler

logger = logging.getLogger('emop')


[docs]class EmopSubmit(EmopBase): def __init__(self, config_path): """ Initialize EmopSubmit object and attributes Args: config_path (str): path to application config file """ super(self.__class__, self).__init__(config_path) self.scheduler = EmopScheduler.get_scheduler_instance(name=self.settings.scheduler, settings=self.settings)
[docs] def optimize_submit(self, page_count, running_job_count, sim=False): """Determine optimal job submission This function attempts to determine the best number of jobs and how many pages per job should be submitted to the scheduler. This function does not return a value but sets the num_jobs and pages_per_job attributes. Args: page_count (int): Number of pages needing to be processed running_job_count (int): Number of active jobs Returns: list: First value is number of jobs and second value is number of pages per job. """ num_jobs = 0 pages_per_job = 1 job_slots_available = int(self.settings.max_jobs - running_job_count) run_option_a = float(page_count) / float(job_slots_available) run_option_b = float(self.settings.max_job_runtime) / float(self.settings.avg_page_runtime) run_option_c = float(self.settings.min_job_runtime) / float(self.settings.avg_page_runtime) logger.debug("JobSlotsAvailable: %s, PageCount: %s" % (job_slots_available, page_count)) logger.debug("RunOptA: %s , RunOptB: %s, RunOptC: %s" % (run_option_a, run_option_b, run_option_c)) # max pages per job > pages in max time if run_option_a > run_option_b: num_jobs = job_slots_available pages_per_job = run_option_b # Pages less than pages in min time elif page_count < run_option_c: num_jobs = page_count / run_option_c pages_per_job = page_count # max pages per job < pages in min time elif run_option_a < run_option_c: num_jobs = page_count / run_option_c pages_per_job = run_option_c # max pages per job else: # TODO: In some cases num_jobs will exceed max_jobs value num_jobs = page_count / run_option_a pages_per_job = run_option_a # Convert values to integers num_jobs = int(num_jobs) pages_per_job = int(pages_per_job) # Incase num_jobs and pages_per_job were type casted to 0 if not num_jobs: num_jobs = 1 if not pages_per_job: pages_per_job = 1 # Case where num_jobs > page_count if num_jobs > page_count or (num_jobs*pages_per_job) > page_count: if page_count > job_slots_available: num_jobs = job_slots_available pages_per_job = int(page_count/num_jobs) else: num_jobs = page_count pages_per_job = 1 expected_runtime = pages_per_job * self.settings.avg_page_runtime expected_runtime_msg = "Expected job runtime: %s seconds" % expected_runtime if sim: logger.info(expected_runtime_msg) else: logger.debug(expected_runtime_msg) # total_pages_to_run = num_jobs * pages_per_job optimal_submit_msg = "Optimal submission is %s jobs with %s pages per job" % (num_jobs, pages_per_job) if sim: logger.info(optimal_submit_msg) else: logger.debug(optimal_submit_msg) return num_jobs, pages_per_job
[docs] def reserve(self, num_pages, r_filter): """Reserve pages for a job Reserve page(s) for work by sending PUT request to dashboard API. Returns: str: The reserved work's proc_id. """ reserve_data = {} if r_filter and isinstance(r_filter, dict): job_queue = r_filter.copy() else: job_queue = {} job_queue["num_pages"] = num_pages if self.settings.operate_on == 'works': job_queue["works"] = '1' reserve_data["job_queue"] = job_queue reserve_request = self.emop_api.put_request("/api/job_queues/reserve", reserve_data) if not reserve_request: return "" requested = reserve_request.get('requested') reserved = reserve_request.get('reserved') proc_id = reserve_request.get('proc_id') results = reserve_request.get('results') logger.debug("Requested %s pages, and %s were reserved with proc_id: %s" % (requested, reserved, proc_id)) logger.debug("Payload: %s" % json.dumps(results, sort_keys=True, indent=4)) if reserved < 1: logger.error("No pages reserved") return "" self.payload = EmopPayload(self.settings, proc_id) self.payload.save_input(results) return proc_id
[docs] def set_job_id(self, proc_id, job_id): """Sends JobID back to dashboard """ data = { "job_queue": { "proc_id": proc_id, "job_id": job_id, } } set_job_id_request = self.emop_api.put_request('/api/job_queues/set_job_id', data) return True