Index: ../trunk-jpl/src/m/classes/clusters/stallo.py =================================================================== --- ../trunk-jpl/src/m/classes/clusters/stallo.py (nonexistent) +++ ../trunk-jpl/src/m/classes/clusters/stallo.py (revision 21987) @@ -0,0 +1,171 @@ +import subprocess +from fielddisplay import fielddisplay +from pairoptions import pairoptions +from issmssh import issmssh +from issmscpin import issmscpin +from issmscpout import issmscpout +from QueueRequirements import QueueRequirements +import datetime +try: + from stallo_settings import stallo_settings +except ImportError: + print 'You need stallo_settings.py to proceed, check presence and sys.path' + +class stallo(object): + """ + Stallo cluster class definition + This is a SLURM queue + The priorities are given to: + - Large jobs + - Short jobs + - small number of job per user + + There are some 20cpu nodes and 16cpu nodes, with 32GB (a few with 128GB) mem per node, you can ask for part of a node if you need more memory.(1 node, 2 CPUS and 10GB per cpu for example) + + + Usage: + cluster=stallo(); + """ + + def __init__(self,*args): + # {{{ + self.name = 'stallo' + self.login = '' + self.numnodes = 2 + self.cpuspernode = 20 + self.mem = 1.6 + self.queue = 'normal' + self.time = 2*60 + self.codepath = '' + self.executionpath = '' + self.interactive = 0 + self.port = [] + self.accountname = '' + self.profiling = 0 + #use provided options to change fields + options=pairoptions(*args) + + #initialize cluster using user settings if provided + self=vilje_settings(self) + #OK get other fields + self=options.AssignObjectFields(self) + self.np=self.numnodes*self.procspernodes + # }}} + + def __repr__(self): + # {{{ + # display the object + s = "class vilje object:" + s = "%s\n%s"%(s,fielddisplay(self,'name','name of the cluster')) + s = "%s\n%s"%(s,fielddisplay(self,'login','login')) + s = "%s\n%s"%(s,fielddisplay(self,'numnodes','number of nodes')) + s = "%s\n%s"%(s,fielddisplay(self,'cpuspernode','number of nodes per CPUs')) + s = "%s\n%s"%(s,fielddisplay(self,'mem','memory per CPU')) + s = "%s\n%s"%(s,fielddisplay(self,'queue','name of the queue (normal (D), short,singlenode,multinode,devel)')) + s = "%s\n%s"%(s,fielddisplay(self,'time','walltime requested in minutes')) + s = "%s\n%s"%(s,fielddisplay(self,'codepath','code path on the cluster')) + s = "%s\n%s"%(s,fielddisplay(self,'executionpath','execution path on the cluster')) + s = "%s\n%s"%(s,fielddisplay(self,'interactive','')) + s = "%s\n%s"%(s,fielddisplay(self,'accountname','your cluster account')) + s = "%s\n%s"%(s,fielddisplay(self,'profiling','enable profiling if 1 default is 0')) + return s + # }}} + def checkconsistency(self,md,solution,analyses): + # {{{ + #Queue dictionarry gives queue name as key and max walltime and cpus as var + queuedict = {'short':[60, 2048], + 'normal':[2*24*60,2048], + 'singlenode':[28*24*60,20], + 'multinode':[28*24*60,2048], + 'devel':[4*60,2048]} + QueueRequirements(queuedict,self.queue,self.np,self.time) + + #Miscelaneous + if not self.login: + md = md.checkmessage('login empty') + if not self.codepath: + md = md.checkmessage('codepath empty') + if not self.executionpath: + md = md.checkmessage('executionpath empty') + if self.interactive==1: + md = md.checkmessage('interactive mode not implemented') + return self + # }}} + def BuildQueueScript(self,dirname,modelname,solution,io_gather,isvalgrind,isgprof,isdakota,isoceancoupling): + # {{{ + + executable='issm.exe' + if isdakota: + version=IssmConfig('_DAKOTA_VERSION_')[0:2] + version=float(version) + if version>=6: + executable='issm_dakota.exe' + if isoceancoupling: + executable='issm_ocean.exe' + #write queuing script + shortname=modelname[0:min(12,len(modelname))] + fid=open(modelname+'.queue','w') + + fid.write('#!/bin/bash -l\n') + fid.write('#SBATCH --job-name=%s \n' % shortname) + fid.write('#SBATCH --partition %s \n' % self.queue) + fid.write('#SBATCH --nodes=%i' % self.numnodes) + fid.write('#SBATCH --ntasks-per-nodes==%i' % self.cpuspernode) + fid.write('#SBATCH --time=%s\n' % timestring) #walltime is minutes + fid.write('#SBATCH --mem-per-cpu=%iGB\n' % self.mem)# mem is in GB + if (mod(self.np,16)+mod(self.np,20))==0: + fid.write('#SBATCH --ntask=%i\n' % self.np) + fid.write('#SBATCH --account=%s\n' % self.accountname) + fid.write('#SBATCH --output %s/%s/%s.outlog \n' % (self.executionpath,dirname,modelname)) + fid.write('#SBATCH --error %s/%s/%s.errlog \n\n' % (self.executionpath,dirname,modelname)) + + fid.write('export ISSM_DIR="%s/../"\n' % self.codepath) + fid.write('module load Automake/1.15-intel-2016a\n') + fid.write('module load libtool/2.4.6-intel-2016a\n') + fid.write('module load CMake/3.5.2-intel-2016a\n') + fid.write('module load intel/2016a\n') + fid.write('module load ParMETIS/4.0.3-intel-2016a\n') + fid.write('module load MUMPS/5.1.1-intel-2016a-parmetis\n') + fid.write('module load PETSc/3.7.2-intel-2016a-Python-2.7.11\n') + fid.write('module load FFTW/3.3.4-intel-2016a\n') + fid.write('module load OpenSSL/1.0.1s-intel-2016a\n') + + fid.write('cd %s/%s/\n\n' % (self.executionpath,dirname)) + if self.profiling==1: + fid.write('module load perf-report\n') + fid.write('perf-report mpirun -np %i %s/%s %s %s/%s %s\n' % (self.np,self.codepath,executable,str(solution),self.executionpath,dirname,modelname)) + else: + fid.write('mpirun -np %i %s/%s %s %s/%s %s\n' % (self.np,self.codepath,executable,str(solution),self.executionpath,dirname,modelname)) + fid.close() + + # }}} + def UploadQueueJob(self,modelname,dirname,filelist): + # {{{ + #compress the files into one zip. + compressstring='tar -zcf %s.tar.gz ' % dirname + for file in filelist: + compressstring += ' %s' % file + subprocess.call(compressstring,shell=True) + + print 'uploading input file and queueing script' + issmscpout(self.name,self.executionpath,self.login,self.port,[dirname+'.tar.gz']) + + # }}} + def LaunchQueueJob(self,modelname,dirname,filelist,restart,batch): + # {{{ + + print 'launching solution sequence on remote cluster' + if restart: + launchcommand='cd %s && cd %s && sbatch %s.queue' % (self.executionpath,dirname,modelname) + else: + launchcommand='cd %s && rm -rf ./%s && mkdir %s && cd %s && mv ../%s.tar.gz ./ && tar -zxf %s.tar.gz && sbatch %s.queue' % (self.executionpath,dirname,dirname,dirname,dirname,dirname,modelname) + issmssh(self.name,self.login,self.port,launchcommand) + + # }}} + def Download(self,dirname,filelist): + # {{{ + + #copy files from cluster to current directory + directory='%s/%s/' % (self.executionpath,dirname) + issmscpin(self.name,self.login,self.port,directory,filelist) + # }}}