from datetime import datetime
import numpy as np
import pandas as pd
from viewclust.target_series import target_series
[docs]def job_use(jobs, d_from, target, d_to='', use_unit='cpu', job_state='all',
time_ref='', grouper_interval='S', usage_interval='H', serialize_queued='', serialize_running='',
serialize_dist=''):
"""Takes a DataFrame full of job information and
returns usage based on specified unit.
This function operates as a stepping stone for plotting usage figures
and returns various series and frames for several different uses.
Parameters
-------
jobs: DataFrame
Job DataFrame typically generated by the ccmnt package.
use_unit: str, optional
Usage unit to examine.
One of: {'cpu', 'cpu-eqv', 'gpu', 'gpu-eqv','gpu-eqv-cdr'}.
Defaults to 'cpu'.
job_state: str, optional
The job state to include in measurement:
{'all','complete', 'running', 'queued'}.
Defaults to 'complete'.
time_ref: str, one of: {sub, req, sub+req}
sub: Jobs run as if they ran at submit time.
req: Jobs run their full requested time from their start time.
sub+req: Jobs run their full requested time from their submit time.
horizon+req: Jobs run their full requested time from the horizon (e.g. d_to).
insta_dur: str, optional
The job duration used to calculate time_ref One of: {'run', 'req'}.
Defaults to 'run'.
d_from: date str
Beginning of the query period, e.g. '2019-04-01T00:00:00'.
target: int-like
Typically a cpu allocation or core eqv value for a particular acount.
Often 50.
d_to: date str, optional
End of the query period, e.g. '2020-01-01T00:00:00'.
Defaults to now if empty.
grouper_interval: str, optional
The interval by which to calculate the start and end time cumulative sum difference.
start and end steps within this interval will be ignored. Job record times occur at
the second interval:
{'S','min', 'H'}.
usage_interval: str, optional
The interval by which to store the usage series.
Job record times occur at the second interval:
{'S','min', 'H'}.
debugging: boolean, optional
Boolean for reporting progress to stdout. Default False.
serialize_running, serialize_queued, serialize_dist: str, optional
Pickles given structure with argument as a name.
If left empty, pickle procedure is skipped.
Defaults to empty.
Returns
-------
clust:
Frame of system info at given time intervals.
Typically referenced by other functions for plotting information.
queued:
Frame of queued resources
running:
Frame of running resources
dist_from_target:
Series for delta plots
"""
# d_to boilerplate
# IF D_TO IS EMPTY IT SHOULD BE SET TO LATEST KNOWN STATE
# CHANGE TIME (SUBMIT,START,END) IN THE JOB RECORDS.
if d_to == '':
t_max = jobs[['submit', 'start', 'end']].max(axis=1)
d_to = str(t_max.max())
# Filter on job state. Different from reason so it is safe.
if job_state == 'complete':
jobs_complete = jobs.copy()
jobs_complete = jobs_complete.loc[jobs_complete['end'].notnull()]
jobs = jobs_complete
elif job_state == 'running':
jobs_running = jobs.copy()
jobs_running = jobs_running.loc[jobs_running['state'] == 'RUNNING']
jobs = jobs_running
jobs['end'] = jobs['start'] + jobs['timelimit']
elif job_state == 'queued':
jobs_queued = jobs.copy()
jobs_queued = jobs_queued.loc[jobs_queued['state'] == 'PENDING']
jobs = jobs_queued
jobs['start'] = jobs['submit']
jobs['end'] = pd.to_datetime(d_to) + jobs['timelimit']
# Boilerplate for time transformation
if time_ref != '':
jobs = jobs.copy()
if time_ref == 'sub':
end = jobs['submit'] + (jobs['end'] - jobs['start'])
jobs['start'] = jobs['submit']
jobs['end'] = end
elif time_ref == 'req':
jobs['end'] = (jobs['start'] + jobs['timelimit'])
elif time_ref == 'sub+req':
end = jobs['submit'] + jobs['timelimit']
jobs['start'] = jobs['submit']
jobs['end'] = end
elif time_ref == 'horizon+req':
end = pd.to_datetime(d_to) + jobs['timelimit']
jobs['start'] = jobs['submit']
jobs['end'] = end
jobs = jobs.sort_values(by=['submit'])
if use_unit == 'cpu':
jobs['use_unit'] = jobs['reqcpus']
elif use_unit == 'cpu-eqv': # TRESBillingWeights=CPU=1.0,Mem=0.25G
jobs['mem_scale'] = (jobs['mem'] / 1024) * .25
jobs['use_unit'] = jobs[['mem_scale', 'reqcpus']].max(axis=1)
elif 'gpu' in use_unit:
jobs['ngpus'] = jobs['reqtres'].str.extract(
r'gpu=(\d+)').fillna(0).astype('int64')
if use_unit == 'gpu':
jobs['use_unit'] = jobs['ngpus']
elif use_unit == 'gpu-eqv':
# Beluga and Graham:
# TRESBillingWeights=CPU=0.0625,Mem=0.015625G,GRES/gpu=1.0,GRES/gpu=1.0
jobs['cpu_scale'] = jobs['reqcpus'] * 0.0625
jobs['mem_scale'] = (jobs['mem'] / 1024) * 0.015625
jobs['use_unit'] = jobs[['cpu_scale', 'mem_scale', 'ngpus']].max(
axis=1)
elif use_unit == 'gpu-eqv-cdr':
# Cedar: TRESBillingWeights=CPU=0.1667,Mem=0.03125G,GRES/gpu=1.0
jobs['cpu_scale'] = jobs['reqcpus'] * 0.1667
jobs['mem_scale'] = (jobs['mem'] / 1024) * 0.03125
jobs['use_unit'] = jobs[['cpu_scale', 'mem_scale', 'ngpus']].max(
axis=1)
else:
raise AttributeError('invalid GPU use_unit')
elif use_unit == 'billing':
jobs['billing'] = jobs['reqtres'].str.extract(
r'billing=(\d+)').fillna(0).astype('int64')
if jobs['billing'].isnull().any():
raise AttributeError('There is no "billing" string in the reqtres')
else:
jobs['use_unit'] = jobs['billing']
else:
raise AttributeError('invalid use_unit')
# Prepare dataframes for resampling
jobs_submit = jobs[['submit','use_unit']].set_index('submit')
jobs_start = jobs[['start', 'use_unit']].set_index('start')
jobs_end = jobs[['end', 'use_unit']].set_index('end')
# Calculate instantaneous usage
jobs_submit = jobs_submit.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0)
jobs_start = jobs_start.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0)
jobs_end = jobs_end.groupby( pd.Grouper(freq=grouper_interval) )['use_unit'].sum().fillna(0)
running = jobs_start.subtract( jobs_end, fill_value=0 ).cumsum()
queued = jobs_submit.subtract( jobs_start, fill_value=0 ).cumsum()
# Resample by the hour
running = running.resample( usage_interval ).mean()
queued = queued.resample( usage_interval ).mean()
# Fill values for sparse job records. The resampling above will result in NaNs
running = running.fillna(method='ffill')
queued = queued.fillna(method='ffill')
baseline = target_series([(d_from, d_to, 0)])
queued = queued.add(baseline, fill_value=0)
running = running.add(baseline, fill_value=0)
# Target: If int, calculate it, else use the variable passed
# (should be a series)
clust = pd.DataFrame()
if isinstance(target, int):
clust = target_series([(d_from, d_to, target)])
else:
clust = target
sum_target = np.cumsum(clust)
sum_running = np.cumsum(running)
# New workaround for job record problems
sum_target = sum_target.loc[d_from:d_to]
sum_running.index.name = 'datetime'
sum_running = sum_running.loc[d_from:d_to]
dist_from_target = (sum_running - sum_target)
if serialize_running != '':
running.to_pickle(serialize_running)
if serialize_queued != '':
queued.to_pickle(serialize_queued)
if serialize_dist != '':
dist_from_target.to_pickle(serialize_dist)
return clust, queued, running, dist_from_target