from io import StringIO
import subprocess
import pandas as pd
import os

# Time columns in job records
# If we exclude PENDING jobs (that we do in slurm_raw_processing), all time columns should have a time stamp,
# except RUNNING jobs that do not have the 'End' stamp.
time_columns = ['Eligible','Submit','Start','End']

# Define what constitutes a duplicate job
duplicate_job_def = ['JobID','Submit','Start']

[docs]def sacct_jobs(account_query, d_from, d_to='', debugging=False, serialize_frame='', slurm_names=False): """Ingest job record information from slurm via sacct and return DataFrame. Parameters ------- account_query: str String query to be sent to sacct via -A flag. d_from: date str Beginning of the query period, e.g. '2019-04-01T00:00:00 debugging: boolean, optional Boolean for reporting progress to stdout. Default False. sacct_file: str, optional Loads a raw query from file. If empty, query is rerun. Defaults to the empty string. serialize_frame: str, optional Pickle the resulting DataFrame. If empty, pickling is skipped. Defaults to the empty string. slurm_names: str, optional Keep slurm's sacct column names instead of shorthands. Defaults to False. Returns ------- DataFrame Returns a standard pandas DataFrame, or an empty dataframe if no jobs are found. """ raw_frame = _get_slurm_records(pd.to_datetime(d_from)) out_frame = _slurm_raw_processing(raw_frame, slurm_names) # Legacy/consistency check: # Protect end time for jobs that are still currently running out_frame['end'] = out_frame['end'].replace({pd.NaT: pd.to_datetime(d_to)}) # return _slurm_consistency_check(out_frame) if debugging else out_frame # TODO: consisder swapping this to a better format if serialize_frame != '': out_frame.to_pickle(serialize_frame, protocol=4) return out_frame
def _get_slurm_records(arg, ssh_client=None): '''Retrieve records either via SSH or from a file.''' sacct_format = 'Account,AllocCPUS,AllocNodes,AllocTRES,AssocID,Cluster,CPUTimeRAW,'\ 'CPUTime,DerivedExitCode,ElapsedRaw,Elapsed,Eligible,End,ExitCode,Flags,GID,Group,'\ 'JobID,JobIDRaw,NCPUS,NNodes,NodeList,Priority,Partition,QOS,QOSRAW,Reason,ReqCPUS,'\ 'ReqMem,ReqNodes,ReqTRES,Reserved,ResvCPURAW,ResvCPU,Start,State,Submit,Suspended,'\ 'SystemCPU,TimelimitRaw,Timelimit,TotalCPU,UID,User,UserCPU,WorkDir' sacct_command = 'TZ=UTC sacct' sacct_options = f'--duplicates --allusers --allocations --parsable2 --delimiter=";" --format={sacct_format}' if isinstance(arg, str): # Read a SLURM dump from a file source = arg command = None if not os.path.isfile(source): print('The seed file does not exist. Quitting.') return pd.DataFrame() elif isinstance(arg, list) and arg: # Get specific jobs command = f'{sacct_command} {sacct_options} --jobs {",".join(arg)}' elif isinstance(arg, pd.Timestamp): # Get a list of jobs in a date range # Note that --start selects jobs in ANY state after the specified time. # This is not the same as filtering by 'Start' afterwards. command = f'{sacct_command} {sacct_options} --start {arg:%Y-%m-%dT%H:%M} --end Now\n' else: print('Unexpected input parameter to get_slurm_records().') return pd.DataFrame() if command: process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) stdout, stderr = process.communicate() source = stdout.decode('UTF-8') try: records = pd.read_csv(StringIO(source), sep=';', dtype='str', on_bad_lines='skip') except e: # TODO: Fix this to be less heavy handed return pd.DataFrame() return pd.DataFrame() if records.empty else records def _slurm_raw_processing(records, slurm_names): check = records.duplicated( keep=False ) if check.any(): duplicated_records = records.loc[ check, 'JobID'].unique().tolist() len1 = len(records) records.drop_duplicates( keep='last', inplace=True, ignore_index=True ) len2 = len(records) print(f'Dropped {len1-len2} fully identical records.') # Convert date/times columns from 'str' to the 'datetime' type. # Invalid parsing will be set to NaN. records[time_columns] = records[time_columns].apply( pd.to_datetime, errors='coerce' ) # Convert integer columns from 'str' to 'int64' # Invalid parsing will be set to NaN and then to 0 columns_int = ['AllocCPUS', 'AllocNodes', 'AssocID', 'CPUTimeRAW', 'ElapsedRaw', 'GID', 'JobIDRaw', 'NCPUS', 'NNodes', 'Priority', 'QOSRAW', 'ReqCPUS', 'ReqNodes', 'ResvCPURAW', 'TimelimitRaw', 'UID'] records[columns_int] = records[columns_int].apply( pd.to_numeric, errors='coerce' ).fillna(0).astype('Int64') # Replace unnecessary columns records['Timelimit'] = records['TimelimitRaw'] records['CPUTime'] = records['CPUTimeRAW'] records['Elapsed'] = records['ElapsedRaw'] records['ResvCPU'] = records['ResvCPURAW'] records.drop( columns=['TimelimitRaw','CPUTimeRAW','ElapsedRaw','ResvCPURAW'], inplace=True ) # Allocated memory per job. Note that memory can be specified as a float in the submission script, # therefore we preserve this type for multiplication, but then cast to integer. records[['Mem','_mem_unit']] = records['AllocTRES'].str.extract('mem=([0-9.]+)(M|G|T)') records['Mem'] = pd.to_numeric( records['Mem'], errors='coerce').fillna(0).astype('float64') records['Mem'].mask( records['_mem_unit']=='G', records['Mem']*1024, inplace=True ) records['Mem'].mask( records['_mem_unit']=='T', records['Mem']*1024*1024, inplace=True ) records['Mem'] = records['Mem'].round(0).astype('Int64') records['MemTime'] = records['Mem']*records['Elapsed'] records.drop( columns=['_mem_unit'], inplace=True ) # GPUs: Get a number of allocated GPUs and GPU-seconds records['NGPUS'] = records['AllocTRES'].str.extract('gpu=(\d+)',expand=False) records['NGPUS'] = pd.to_numeric( records['NGPUS'], errors='coerce' ).fillna(0).astype('Int64') records['GPUTime'] = records['NGPUS']*records['Elapsed'] if not slurm_names: old_fields = ['jobid', 'user', 'account', 'submit', 'start', 'end', 'ncpus', 'nnodes', 'reqmem', 'timelimit', 'state', 'reqtres', 'reqtres', 'priority', 'partition', 'reqcpus', 'mem', 'ngpus', 'alloctres'] records.columns = records.columns.str.lower() records = records.drop(records.columns.difference(old_fields), 1) return records def _slurm_consistency_check( records ): ''' Perform consistency checks of the SLURM records. ''' print('Consistency check started.') # Exclude running and pending jobs from analysis states = ['RUNNING','PENDING'] check = records['State'].isin( states ) if any(check): print(f' {sum(check)} records of jobs in {states} states have been excluded from the consistency check.') records = records[ ~check ] # Runaway jobs # Some 'FAILED' records might have NaN in 'End' due to SLURM glitches. These are called runaway jobs. # They can be fixed by running 'sacctmgr show RunawayJobs' on the cluster. # We also check all other time columns just in case. check = records[ time_columns ].isna().any(1) if any(check): print(f' NaNs detected in columns {time_columns} in the following {sum(check)} records that have been excluded: {records.loc[check,"JobID"].to_list()}') records = records[ ~check ] # Data consistency checks # Verify that 'End'-'Start' is equal to 'Elapsed' check = ( (records['End']-records['Start']).dt.total_seconds().astype('int64') - records['Elapsed'] )!=0 if any(check): print(f' Failed consistency check for Elapsed on the following {sum(check)} JobIDs:', records.loc[check,'JobID'].to_list() ) # Verify that 'NCPUS'*'Elapsed' is equal to 'CPUTime' check = ( records['NCPUS']*records['Elapsed'] - records['CPUTime'] )!=0 if any(check): print(f' Failed consistency check for CPUTime on the following {sum(check)} JobIDs:', records.loc[check,'JobID'].to_list() ) # Verify that 'AllocCPUS' and 'NCPUS' are the same (per SLURM documentation). check = ( records['AllocCPUS']!=records['NCPUS'] ) if any(check): print(f' Failed consistency check for AllocCPUS and NCPUS on the following {sum(check)} JobIDs:', records.loc[check,'JobID'].to_list() ) print('Consistency check ended.') return records