Source code for viewclust.job_use

from datetime import datetime
import numpy as np
import pandas as pd
from viewclust.target_series import target_series


[docs]def job_use(jobs, d_from, target, d_to='', use_unit='cpu', job_state='all', time_ref='', grouper_interval='S', usage_interval='H', serialize_queued='', serialize_running='', serialize_dist=''): """Takes a DataFrame full of job information and returns usage based on specified unit. This function operates as a stepping stone for plotting usage figures and returns various series and frames for several different uses. Parameters ------- jobs: DataFrame Job DataFrame typically generated by the ccmnt package. use_unit: str, optional Usage unit to examine. One of: {'cpu', 'cpu-eqv', 'gpu', 'gpu-eqv','gpu-eqv-cdr'}. Defaults to 'cpu'. job_state: str, optional The job state to include in measurement: {'all','complete', 'running', 'queued'}. Defaults to 'complete'. time_ref: str, one of: {sub, req, sub+req} sub: Jobs run as if they ran at submit time. req: Jobs run their full requested time from their start time. sub+req: Jobs run their full requested time from their submit time. horizon+req: Jobs run their full requested time from the horizon (e.g. d_to). insta_dur: str, optional The job duration used to calculate time_ref One of: {'run', 'req'}. Defaults to 'run'. d_from: date str Beginning of the query period, e.g. '2019-04-01T00:00:00'. target: int-like Typically a cpu allocation or core eqv value for a particular acount. Often 50. d_to: date str, optional End of the query period, e.g. '2020-01-01T00:00:00'. Defaults to now if empty. grouper_interval: str, optional The interval by which to calculate the start and end time cumulative sum difference. start and end steps within this interval will be ignored. Job record times occur at the second interval: {'S','min', 'H'}. usage_interval: str, optional The interval by which to store the usage series. Job record times occur at the second interval: {'S','min', 'H'}. debugging: boolean, optional Boolean for reporting progress to stdout. Default False. serialize_running, serialize_queued, serialize_dist: str, optional Pickles given structure with argument as a name. If left empty, pickle procedure is skipped. Defaults to empty. Returns ------- clust: Frame of system info at given time intervals. Typically referenced by other functions for plotting information. queued: Frame of queued resources running: Frame of running resources dist_from_target: Series for delta plots """ # d_to boilerplate # IF D_TO IS EMPTY IT SHOULD BE SET TO LATEST KNOWN STATE # CHANGE TIME (SUBMIT,START,END) IN THE JOB RECORDS. if d_to == '': t_max = jobs[['submit', 'start', 'end']].max(axis=1) d_to = str(t_max.max()) # Filter on job state. Different from reason so it is safe. if job_state == 'complete': jobs_complete = jobs.copy() jobs_complete = jobs_complete.loc[jobs_complete['end'].notnull()] jobs = jobs_complete elif job_state == 'running': jobs_running = jobs.copy() jobs_running = jobs_running.loc[jobs_running['state'] == 'RUNNING'] jobs = jobs_running jobs['end'] = jobs['start'] + jobs['timelimit'] elif job_state == 'queued': jobs_queued = jobs.copy() jobs_queued = jobs_queued.loc[jobs_queued['state'] == 'PENDING'] jobs = jobs_queued jobs['start'] = jobs['submit'] jobs['end'] = pd.to_datetime(d_to) + jobs['timelimit'] # Boilerplate for time transformation if time_ref != '': jobs = jobs.copy() if time_ref == 'sub': end = jobs['submit'] + (jobs['end'] - jobs['start']) jobs['start'] = jobs['submit'] jobs['end'] = end elif time_ref == 'req': jobs['end'] = (jobs['start'] + jobs['timelimit']) elif time_ref == 'sub+req': end = jobs['submit'] + jobs['timelimit'] jobs['start'] = jobs['submit'] jobs['end'] = end elif time_ref == 'horizon+req': end = pd.to_datetime(d_to) + jobs['timelimit'] jobs['start'] = jobs['submit'] jobs['end'] = end jobs = jobs.sort_values(by=['submit']) if use_unit == 'cpu': jobs['use_unit'] = jobs['reqcpus'] elif use_unit == 'cpu-eqv': # TRESBillingWeights=CPU=1.0,Mem=0.25G jobs['mem_scale'] = (jobs['mem'] / 1024) * .25 jobs['use_unit'] = jobs[['mem_scale', 'reqcpus']].max(axis=1) elif 'gpu' in use_unit: jobs['ngpus'] = jobs['reqtres'].str.extract( r'gpu=(\d+)').fillna(0).astype('int64') if use_unit == 'gpu': jobs['use_unit'] = jobs['ngpus'] elif use_unit == 'gpu-eqv': # Beluga and Graham: # TRESBillingWeights=CPU=0.0625,Mem=0.015625G,GRES/gpu=1.0,GRES/gpu=1.0 jobs['cpu_scale'] = jobs['reqcpus'] * 0.0625 jobs['mem_scale'] = (jobs['mem'] / 1024) * 0.015625 jobs['use_unit'] = jobs[['cpu_scale', 'mem_scale', 'ngpus']].max( axis=1) elif use_unit == 'gpu-eqv-cdr': # Cedar: TRESBillingWeights=CPU=0.1667,Mem=0.03125G,GRES/gpu=1.0 jobs['cpu_scale'] = jobs['reqcpus'] * 0.1667 jobs['mem_scale'] = (jobs['mem'] / 1024) * 0.03125 jobs['use_unit'] = jobs[['cpu_scale', 'mem_scale', 'ngpus']].max( axis=1) else: raise AttributeError('invalid GPU use_unit') elif use_unit == 'billing': jobs['billing'] = jobs['reqtres'].str.extract( r'billing=(\d+)').fillna(0).astype('int64') if jobs['billing'].isnull().any(): raise AttributeError('There is no "billing" string in the reqtres') else: jobs['use_unit'] = jobs['billing'] else: raise AttributeError('invalid use_unit') # Prepare dataframes for resampling jobs_submit = jobs[['submit','use_unit']].set_index('submit') jobs_start = jobs[['start', 'use_unit']].set_index('start') jobs_end = jobs[['end', 'use_unit']].set_index('end') # Calculate instantaneous usage jobs_submit = jobs_submit.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0) jobs_start = jobs_start.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0) jobs_end = jobs_end.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0) running = jobs_start.subtract( jobs_end, fill_value=0 ).cumsum() queued = jobs_submit.subtract( jobs_start, fill_value=0 ).cumsum() # Resample by the hour running = running.resample( usage_interval ).mean() queued = queued.resample( usage_interval ).mean() # Fill values for sparse job records. The resampling above will result in NaNs running = running.fillna(method='ffill') queued = queued.fillna(method='ffill') baseline = target_series([(d_from, d_to, 0)]) queued = queued.add(baseline, fill_value=0) running = running.add(baseline, fill_value=0) # Target: If int, calculate it, else use the variable passed # (should be a series) clust = pd.DataFrame() if isinstance(target, int): clust = target_series([(d_from, d_to, target)]) else: clust = target sum_target = np.cumsum(clust) sum_running = np.cumsum(running) # New workaround for job record problems sum_target = sum_target.loc[d_from:d_to] sum_running.index.name = 'datetime' sum_running = sum_running.loc[d_from:d_to] dist_from_target = (sum_running - sum_target) if serialize_running != '': running.to_pickle(serialize_running) if serialize_queued != '': queued.to_pickle(serialize_queued) if serialize_dist != '': dist_from_target.to_pickle(serialize_dist) return clust, queued, running, dist_from_target