source: issm/trunk-jpl/src/m/classes/clusters/fram.py@ 24213

Last change on this file since 24213 was 24213, checked in by bdef, 5 years ago

CHG: syntax cahnge to meet most of Pep8 requirement

File size: 7.0 KB
Line 
1import subprocess
2import numpy as np
3from fielddisplay import fielddisplay
4from pairoptions import pairoptions
5from issmssh import issmssh
6from issmscpin import issmscpin
7from issmscpout import issmscpout
8from QueueRequirements import QueueRequirements
9from IssmConfig import IssmConfig
10try:
11 from fram_settings import fram_settings
12except ImportError:
13 print('You need fram_settings.py to proceed, check presence and sys.path')
14
15
16class fram(object):
17 """
18 Fram cluster class definition
19 This is a SLURM queue
20 The priorities are based on a point system, reservation when reaching 20000 and earning 1 point per min.
21 - Devel queue starts at 19990
22 - Normal starts at 19940
23 - Normal unpri atarts at 19400
24
25 Jobs can be:
26 - normal (4 to 30 nodes, more if asked, 48h max walltime, 60Gb per nodes)
27 - bigmem for big memory nodes (8 512Gb nodes and 2 6Tb nodes, shared nodes, 14days max walltime
28
29 Usage:
30 cluster = stallo()
31 """
32
33 def __init__(self, *args): # {{{
34 self.name = 'fram'
35 self.login = ''
36 self.numnodes = 2
37 self.cpuspernode = 20
38 self.mem = 1.6
39 self.queue = 'normal'
40 self.time = 2 * 60
41 self.codepath = ''
42 self.executionpath = ''
43 self.interactive = 0
44 self.port = []
45 self.accountname = ''
46 self.profiling = 0
47 #use provided options to change fields
48 options = pairoptions(*args)
49
50 #initialize cluster using user settings if provided
51 self = fram_settings(self)
52 #OK get other fields
53 self = options.AssignObjectFields(self)
54 self.np = self.numnodes * self.cpuspernode
55 # }}}
56
57 def __repr__(self): # {{{
58 # display the object
59 s = "class vilje object:"
60 s = "%s\n%s" % (s, fielddisplay(self, 'name', 'name of the cluster'))
61 s = "%s\n%s" % (s, fielddisplay(self, 'login', 'login'))
62 s = "%s\n%s" % (s, fielddisplay(self, 'numnodes', 'number of nodes'))
63 s = "%s\n%s" % (s, fielddisplay(self, 'cpuspernode', 'number of nodes per CPUs'))
64 s = "%s\n%s" % (s, fielddisplay(self, 'mem', 'memory per CPU'))
65 s = "%s\n%s" % (s, fielddisplay(self, 'queue', 'name of the queue (normal (D), short, singlenode, multinode, devel)'))
66 s = "%s\n%s" % (s, fielddisplay(self, 'time', 'walltime requested in minutes'))
67 s = "%s\n%s" % (s, fielddisplay(self, 'codepath', 'code path on the cluster'))
68 s = "%s\n%s" % (s, fielddisplay(self, 'executionpath', 'execution path on the cluster'))
69 s = "%s\n%s" % (s, fielddisplay(self, 'interactive', ''))
70 s = "%s\n%s" % (s, fielddisplay(self, 'accountname', 'your cluster account'))
71 s = "%s\n%s" % (s, fielddisplay(self, 'profiling', 'enable profiling if 1 default is 0'))
72 return s
73 # }}}
74
75 def checkconsistency(self, md, solution, analyses): # {{{
76 #Queue dictionarry gives queue name as key and max walltime and cpus as var
77 queuedict = {'normal': [2 * 24 * 60, 2048],
78 'devel': [4 * 60, 2048]}
79 QueueRequirements(queuedict, self.queue, self.np, self.time)
80
81 #Miscelaneous
82 if not self.login:
83 md = md.checkmessage('login empty')
84 if not self.codepath:
85 md = md.checkmessage('codepath empty')
86 if not self.executionpath:
87 md = md.checkmessage('executionpath empty')
88 if self.interactive == 1:
89 md = md.checkmessage('interactive mode not implemented')
90 return self
91 # }}}
92
93 def BuildQueueScript(self, dirname, modelname, solution, io_gather, isvalgrind, isgprof, isdakota, isoceancoupling): # {{{
94 executable = 'issm.exe'
95 if isdakota:
96 version = IssmConfig('_DAKOTA_VERSION_')[0:2]
97 version = float(version)
98 if version >= 6:
99 executable = 'issm_dakota.exe'
100 if isoceancoupling:
101 executable = 'issm_ocean.exe'
102 #write queuing script
103 shortname = modelname[0:min(12, len(modelname))]
104 fid = open(modelname + '.queue', 'w')
105
106 fid.write('#!/bin/bash -l\n')
107 fid.write('#SBATCH --job-name=%s \n' % shortname)
108 fid.write('#SBATCH --partition %s \n' % self.queue)
109 fid.write('#SBATCH --nodes=%i' % self.numnodes)
110 fid.write('#SBATCH --ntasks-per-nodes==%i \n' % self.cpuspernode)
111 fid.write('#SBATCH --time=%s\n' % self.time) #walltime is minutes
112 fid.write('#SBATCH --mem-per-cpu=%iGB\n' % self.mem) # mem is in GB
113 if (np.mod(self.np, 16) + np.mod(self.np, 20)) == 0:
114 fid.write('#SBATCH --ntask=%i\n' % self.np)
115 fid.write('#SBATCH --account=%s\n' % self.accountname)
116 fid.write('#SBATCH --output %s/%s /%s.outlog \n' % (self.executionpath, dirname, modelname))
117 fid.write('#SBATCH --error %s/%s /%s.errlog \n\n' % (self.executionpath, dirname, modelname))
118
119 fid.write('export ISSM_DIR="%s/../ "\n' % self.codepath)
120 fid.write('module restore system\n')
121 fid.write('module load load Automake/1.15.1-GCCcore-6.3.0\n')
122 fid.write('module load libtool/2.4.6-GCCcore-6.3.0\n')
123 fid.write('module load CMake/3.9.1\n')
124 fid.write('module load PETSc/3.8.0-intel-2017a-Python-2.7.13\n')
125 fid.write('module load ParMETIS/4.0.3-intel-2017a\n')
126 fid.write('cd %s/%s/ \n\n' % (self.executionpath, dirname))
127 if self.profiling == 1:
128 fid.write('module load perf-report\n')
129 fid.write('perf-report mpirun -np %i %s/%s %s %s/%s %s\n' % (self.np, self.codepath, executable, str(solution), self.executionpath, dirname, modelname))
130 else:
131 fid.write('mpirun -np %i %s/%s %s %s/%s %s\n' % (self.np, self.codepath, executable, str(solution), self.executionpath, dirname, modelname))
132 fid.close()
133
134 # }}}
135
136 def UploadQueueJob(self, modelname, dirname, filelist): # {{{
137 #compress the files into one zip.
138 compressstring = 'tar -zcf %s.tar.gz ' % dirname
139 for file in filelist:
140 compressstring += ' %s' % file
141 subprocess.call(compressstring, shell=True)
142
143 print('uploading input file and queueing script')
144 issmscpout(self.name, self.executionpath, self.login, self.port, [dirname + '.tar.gz'])
145
146 # }}}
147 def LaunchQueueJob(self, modelname, dirname, filelist, restart, batch): # {{{
148 print('launching solution sequence on remote cluster')
149 if restart:
150 launchcommand = 'cd %s && cd %s && sbatch %s.queue' % (self.executionpath, dirname, modelname)
151 else:
152 launchcommand = 'cd %s && rm -rf ./%s && mkdir %s && cd %s && mv ../%s.tar.gz ./ && tar -zxf %s.tar.gz && sbatch %s.queue' % (self.executionpath, dirname, dirname, dirname, dirname, dirname, modelname)
153 issmssh(self.name, self.login, self.port, launchcommand)
154
155 # }}}
156 def Download(self, dirname, filelist): # {{{
157 #copy files from cluster to current directory
158 directory = '%s/%s/' % (self.executionpath, dirname)
159 issmscpin(self.name, self.login, self.port, directory, filelist)
160 # }}}
Note: See TracBrowser for help on using the repository browser.