source: issm/trunk-jpl/src/py3/classes/clusters/pfe.py@ 19895

Last change on this file since 19895 was 19895, checked in by bdef, 9 years ago

NEW:Adding directory to hold the python 3 implementation of the interface

File size: 6.8 KB
Line 
1# import socket
2# import os
3# import math
4import subprocess
5from fielddisplay import fielddisplay
6from EnumToString import EnumToString
7from pairoptions import pairoptions
8from issmssh import issmssh
9from issmscpin import issmscpin
10from issmscpout import issmscpout
11from QueueRequirements import QueueRequirements
12try:
13 from pfe_settings import pfe_settings
14except ImportError:
15 print('You need pfe_settings.py to proceed, check presence and sys.path')
16
17class pfe(object):
18 """
19 PFE cluster class definition
20
21 Usage:
22 cluster=pfe();
23 cluster=pfe('np',3);
24 cluster=pfe('np',3,'login','username');
25 """
26
27 def __init__(self,**kwargs):
28 # {{{
29
30 self.name = 'pfe'
31 self.login = ''
32 self.numnodes = 20
33 self.cpuspernode = 8
34 self.port = 1025
35 self.queue = 'long'
36 self.time = 12*60
37 self.processor = 'wes'
38 self.codepath = ''
39 self.executionpath = ''
40 self.grouplist = 's1010'
41 self.interactive = 0
42 self.bbftp = 0
43 self.numstreams = 8
44 self.hyperthreading = 0
45
46 #use provided options to change fields
47 options=pairoptions(**kwargs)
48
49 #initialize cluster using user settings if provided
50 self=pfe_settings(self)
51 self.np=self.nprocs()
52 #OK get other fields
53 self=options.AssignObjectFields(self)
54
55 # }}}
56
57 def __repr__(self):
58 # {{{
59 # display the object
60 s = "class pfe object:"
61 s = "%s\n%s"%(s,fielddisplay(self,'name','name of the cluster'))
62 s = "%s\n%s"%(s,fielddisplay(self,'login','login'))
63 s = "%s\n%s"%(s,fielddisplay(self,'numnodes','number of nodes'))
64 s = "%s\n%s"%(s,fielddisplay(self,'cpuspernode','number of nodes per CPUs'))
65 s = "%s\n%s"%(s,fielddisplay(self,'np','number of CPUs'))
66 s = "%s\n%s"%(s,fielddisplay(self,'port','machine access port'))
67 s = "%s\n%s"%(s,fielddisplay(self,'codepath','code path on the cluster'))
68 s = "%s\n%s"%(s,fielddisplay(self,'executionpath','execution path on the cluster'))
69 s = "%s\n%s"%(s,fielddisplay(self,'queue','name of the queue'))
70 s = "%s\n%s"%(s,fielddisplay(self,'time','walltime requested'))
71 s = "%s\n%s"%(s,fielddisplay(self,'processor','type of processor'))
72 s = "%s\n%s"%(s,fielddisplay(self,'grouplist','name of the group'))
73 s = "%s\n%s"%(s,fielddisplay(self,'interactive',''))
74 s = "%s\n%s"%(s,fielddisplay(self,'bbftp',''))
75 s = "%s\n%s"%(s,fielddisplay(self,'numstreams',''))
76 s = "%s\n%s"%(s,fielddisplay(self,'hyperthreading',''))
77 return s
78 # }}}
79
80 def nprocs(self):
81 # {{{
82 self.np=self.numnodes*self.cpuspernode
83 return self.np
84 # }}}
85 def checkconsistency(self,md,solution,analyses):
86 # {{{
87
88
89 queuedict = {'long': [5*24*60, 2048],
90 'normal': [8*60, 2048],
91 'debug':[2*60,150],
92 'devel':[2*60,150]}
93 QueueRequirements(queuedict,self.queue,self.nprocs(),self.time)
94
95 #now, check cluster.cpuspernode according to processor type
96 if self.processor=='har' or self.processor=='neh':
97 if self.hyperthreading:
98 if not 0<self.cpuspernode<17:
99 md = md.checkmessage('cpuspernode should be between 1 and 16 for ''neh'' and ''har'' processors in hyperthreading mode')
100 else:
101 if not 0<self.cpuspernode<9:
102 md = md.checkmessage('cpuspernode should be between 1 and 8 for ''neh'' and ''har'' processors')
103
104 elif self.processor=='wes':
105 if self.hyperthreading:
106 if not 0<self.cpuspernode<25:
107 md = md.checkmessage('cpuspernode should be between 1 and 24 for ''wes'' processors in hyperthreading mode')
108 else:
109 if not 0<self.cpuspernode<13:
110 md = md.checkmessage('cpuspernode should be between 1 and 12 for ''wes'' processors')
111
112 elif self.processor=='ivy':
113 if self.hyperthreading:
114 if not 0<self.cpuspernode<41:
115 md = md.checkmessage('cpuspernode should be between 1 and 40 for ''ivy'' processors in hyperthreading mode')
116 else:
117 if not 0<self.cpuspernode<21:
118 md = md.checkmessage('cpuspernode should be between 1 and 20 for ''ivy'' processors')
119 else:
120 md = md.checkmessage('unknown processor type, should be ''neh'',''wes'' or ''har'' or ''ivy''')
121
122 #Miscelaneous
123 if not self.login:
124 md = md.checkmessage('login empty')
125 if not self.codepath:
126 md = md.checkmessage('codepath empty')
127 if not self.executionpath:
128 md = md.checkmessage('executionpath empty')
129 if not self.grouplist:
130 md = md.checkmessage('grouplist empty')
131 if self.interactive==1:
132 md = md.checkmessage('interactive mode not implemented')
133
134 return self
135 # }}}
136 def BuildQueueScript(self,dirname,modelname,solution,io_gather,isvalgrind,isgprof,isdakota):
137 # {{{
138
139 executable='issm.exe'
140 if isdakota:
141 version=IssmConfig('_DAKOTA_VERSION_')[0:2]
142 version=float(version)
143 if version>=6:
144 executable='issm_dakota.exe'
145
146 #write queuing script
147 fid=open(modelname+'.queue','w')
148 fid.write('#PBS -S /bin/bash\n')
149 fid.write('#PBS -l select=%i:ncpus=%i:model=%s\n' % (self.numnodes,self.cpuspernode,self.processor))
150 fid.write('#PBS -l walltime=%i\n' % (self.time*60))
151 fid.write('#PBS -q %s \n' % self.queue)
152 fid.write('#PBS -W group_list=%s\n' % self.grouplist)
153 fid.write('#PBS -m e\n')
154 fid.write('#PBS -o %s/%s/%s.outlog \n' % (self.executionpath,dirname,modelname))
155 fid.write('#PBS -e %s/%s/%s.errlog \n\n' % (self.executionpath,dirname,modelname))
156 fid.write('. /usr/share/modules/init/bash\n\n')
157 fid.write('module load comp-intel/2015.0.090\n')
158 fid.write('module load mpi-sgi/mpt.2.11r13\n')
159 fid.write('export PATH="$PATH:."\n\n')
160 fid.write('export MPI_GROUP_MAX=64\n\n')
161 fid.write('export ISSM_DIR="%s/../"\n' % self.codepath)
162 fid.write('source $ISSM_DIR/etc/environment.sh\n')
163 fid.write('cd %s/%s/\n\n' % (self.executionpath,dirname))
164 fid.write('mpiexec -np %i %s/%s %s %s/%s %s\n' % (self.nprocs(),self.codepath,executable,str(EnumToString(solution)[0]),self.executionpath,dirname,modelname))
165
166 fid.close()
167
168 # }}}
169 def UploadQueueJob(self,modelname,dirname,filelist):
170 # {{{
171
172 #compress the files into one zip.
173 compressstring='tar -zcf %s.tar.gz ' % dirname
174 for file in filelist:
175 compressstring += ' %s' % file
176 subprocess.call(compressstring,shell=True)
177
178 print('uploading input file and queueing script')
179 issmscpout(self.name,self.executionpath,self.login,self.port,[dirname+'.tar.gz'])
180
181 # }}}
182 def LaunchQueueJob(self,modelname,dirname,filelist,restart):
183 # {{{
184
185 print('launching solution sequence on remote cluster')
186 if restart:
187 launchcommand='cd %s && cd %s && qsub %s.queue' % (self.executionpath,dirname,modelname)
188 else:
189 launchcommand='cd %s && rm -rf ./%s && mkdir %s && cd %s && mv ../%s.tar.gz ./ && tar -zxf %s.tar.gz && qsub %s.queue' % (self.executionpath,dirname,dirname,dirname,dirname,dirname,modelname)
190 issmssh(self.name,self.login,self.port,launchcommand)
191
192 # }}}
193 def Download(self,dirname,filelist):
194 # {{{
195
196 #copy files from cluster to current directory
197 directory='%s/%s/' % (self.executionpath,dirname)
198 issmscpin(self.name,self.login,self.port,directory,filelist)
199 # }}}
Note: See TracBrowser for help on using the repository browser.