  • ../trunk-jpl/src/m/classes/clusters/

     1import subprocess
     2from fielddisplay import fielddisplay
     3from pairoptions import pairoptions
     4from issmssh import issmssh
     5from issmscpin import issmscpin
     6from issmscpout import issmscpout
     7from QueueRequirements import QueueRequirements
     8import datetime
     10        from stallo_settings import stallo_settings
     11except ImportError:
     12        print 'You need to proceed, check presence and sys.path'
     14class stallo(object):
     15        """
     16        Stallo cluster class definition
     17        This is a SLURM queue
     18        The priorities are given to:
     19           - Large jobs
     20           - Short jobs
     21           - small number of job per user
     23        There are some 20cpu nodes and 16cpu nodes, with 32GB (a few with 128GB) mem per node, you can ask for part of a node if you need more memory.(1 node, 2 CPUS and 10GB per cpu for example)
     26           Usage:
     27              cluster=stallo();
     28        """
     30        def __init__(self,*args):
     31        # {{{
     32                 = 'stallo'
     33                self.login          = ''
     34                self.numnodes       = 2
     35                self.cpuspernode    = 20
     36                self.mem            = 1.6
     37                self.queue          = 'normal'
     38                self.time           = 2*60
     39                self.codepath       = ''
     40                self.executionpath  = ''
     41                self.interactive    = 0
     42                self.port           = []
     43                self.accountname    = ''
     44                self.profiling      = 0
     45                #use provided options to change fields
     46                options=pairoptions(*args)
     48                #initialize cluster using user settings if provided
     49                self=vilje_settings(self)
     50                #OK get other fields
     51                self=options.AssignObjectFields(self)
     52      *self.procspernodes               
     53        # }}}
     55        def __repr__(self):
     56        # {{{
     57                #  display the object
     58                s = "class vilje object:"
     59                s = "%s\n%s"%(s,fielddisplay(self,'name','name of the cluster'))
     60                s = "%s\n%s"%(s,fielddisplay(self,'login','login'))
     61                s = "%s\n%s"%(s,fielddisplay(self,'numnodes','number of nodes'))
     62                s = "%s\n%s"%(s,fielddisplay(self,'cpuspernode','number of nodes per CPUs'))
     63                s = "%s\n%s"%(s,fielddisplay(self,'mem','memory per CPU'))
     64                s = "%s\n%s"%(s,fielddisplay(self,'queue','name of the queue (normal (D), short,singlenode,multinode,devel)'))
     65                s = "%s\n%s"%(s,fielddisplay(self,'time','walltime requested in minutes'))
     66                s = "%s\n%s"%(s,fielddisplay(self,'codepath','code path on the cluster'))
     67                s = "%s\n%s"%(s,fielddisplay(self,'executionpath','execution path on the cluster'))
     68                s = "%s\n%s"%(s,fielddisplay(self,'interactive',''))
     69                s = "%s\n%s"%(s,fielddisplay(self,'accountname','your cluster account'))
     70                s = "%s\n%s"%(s,fielddisplay(self,'profiling','enable profiling if 1 default is 0'))
     71                return s
     72        # }}}
     73        def checkconsistency(self,md,solution,analyses):
     74        # {{{
     75                #Queue dictionarry  gives queue name as key and max walltime and cpus as var
     76                queuedict = {'short':[60, 2048],
     77                                                                 'normal':[2*24*60,2048],
     78                                                                 'singlenode':[28*24*60,20],
     79                                                                 'multinode':[28*24*60,2048],
     80                                                                 'devel':[4*60,2048]}
     81                QueueRequirements(queuedict,self.queue,,self.time)
     83                #Miscelaneous
     84                if not self.login:
     85                        md = md.checkmessage('login empty')
     86                if not self.codepath:
     87                        md = md.checkmessage('codepath empty')
     88                if not self.executionpath:
     89                        md = md.checkmessage('executionpath empty')
     90                if self.interactive==1:
     91                        md = md.checkmessage('interactive mode not implemented')
     92                return self
     93                # }}}
     94        def BuildQueueScript(self,dirname,modelname,solution,io_gather,isvalgrind,isgprof,isdakota,isoceancoupling):
     95                # {{{
     97                executable='issm.exe'
     98                if isdakota:
     99                        version=IssmConfig('_DAKOTA_VERSION_')[0:2]
     100                        version=float(version)
     101                        if version>=6:
     102                                executable='issm_dakota.exe'
     103                if isoceancoupling:
     104                        executable='issm_ocean.exe'
     105                #write queuing script
     106                shortname=modelname[0:min(12,len(modelname))]
     107                fid=open(modelname+'.queue','w')
     109                fid.write('#!/bin/bash -l\n')
     110                fid.write('#SBATCH --job-name=%s \n' % shortname)
     111                fid.write('#SBATCH --partition %s \n' % self.queue)
     112                fid.write('#SBATCH --nodes=%i' % self.numnodes)
     113                fid.write('#SBATCH --ntasks-per-nodes==%i' % self.cpuspernode)                                                                 
     114                fid.write('#SBATCH --time=%s\n' % timestring) #walltime is minutes
     115                fid.write('#SBATCH --mem-per-cpu=%iGB\n' % self.mem)# mem is in GB
     116                if (mod(,16)+mod(,20))==0:
     117                        fid.write('#SBATCH --ntask=%i\n' %
     118                fid.write('#SBATCH --account=%s\n' % self.accountname)
     119                fid.write('#SBATCH --output %s/%s/%s.outlog \n' % (self.executionpath,dirname,modelname))
     120                fid.write('#SBATCH --error %s/%s/%s.errlog \n\n' % (self.executionpath,dirname,modelname))
     122                fid.write('export ISSM_DIR="%s/../"\n' % self.codepath)
     123                fid.write('module load Automake/1.15-intel-2016a\n')
     124                fid.write('module load libtool/2.4.6-intel-2016a\n')
     125                fid.write('module load CMake/3.5.2-intel-2016a\n')
     126                fid.write('module load intel/2016a\n')
     127                fid.write('module load ParMETIS/4.0.3-intel-2016a\n')
     128                fid.write('module load MUMPS/5.1.1-intel-2016a-parmetis\n')
     129                fid.write('module load PETSc/3.7.2-intel-2016a-Python-2.7.11\n')
     130                fid.write('module load FFTW/3.3.4-intel-2016a\n')
     131                fid.write('module load OpenSSL/1.0.1s-intel-2016a\n')
     133                fid.write('cd %s/%s/\n\n' % (self.executionpath,dirname))
     134                if self.profiling==1:
     135                        fid.write('module load perf-report\n')
     136                        fid.write('perf-report mpirun -np %i %s/%s %s %s/%s %s\n' % (,self.codepath,executable,str(solution),self.executionpath,dirname,modelname))
     137                else:
     138                        fid.write('mpirun -np %i %s/%s %s %s/%s %s\n' % (,self.codepath,executable,str(solution),self.executionpath,dirname,modelname))
     139                fid.close()
     141                # }}}
     142        def UploadQueueJob(self,modelname,dirname,filelist):
     143                # {{{
     144                #compress the files into one zip.
     145                compressstring='tar -zcf %s.tar.gz ' % dirname
     146                for file in filelist:
     147                        compressstring += ' %s' % file
     148      ,shell=True)
     150                print 'uploading input file and queueing script'
     151                issmscpout(,self.executionpath,self.login,self.port,[dirname+'.tar.gz'])
     153                # }}}
     154        def LaunchQueueJob(self,modelname,dirname,filelist,restart,batch):
     155                # {{{
     157                print 'launching solution sequence on remote cluster'
     158                if restart:
     159                        launchcommand='cd %s && cd %s && sbatch %s.queue' % (self.executionpath,dirname,modelname)
     160                else:
     161                        launchcommand='cd %s && rm -rf ./%s && mkdir %s && cd %s && mv ../%s.tar.gz ./ && tar -zxf %s.tar.gz  && sbatch %s.queue' % (self.executionpath,dirname,dirname,dirname,dirname,dirname,modelname)
     162                issmssh(,self.login,self.port,launchcommand)
     164                # }}}
     165        def Download(self,dirname,filelist):
     166                # {{{
     168                #copy files from cluster to current directory
     169                directory='%s/%s/' % (self.executionpath,dirname)
     170                issmscpin(,self.login,self.port,directory,filelist)
     171                # }}}
