comparison parallel.py @ 0:a4cd8608ef6b draft

Uploaded
author petr-novak
date Mon, 01 Apr 2019 07:56:36 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:a4cd8608ef6b
1 #!/usr/bin/env python3
2 import multiprocessing
3 import os
4 import time
5 from itertools import cycle
6 '''
7 functions for parallel processing of data chunks using worker function
8 '''
9
10
11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""):
12 '''
13 Example of pbs_params:
14 -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G
15 -l walltime=150:00:00,nodes=1:ppn=1
16
17 '''
18 jobs = []
19 status_function = []
20 status_command = []
21 for cmd, sf in zip(cmds, status_files):
22 jobs.append(pbs_send_job(cmd, sf, qsub_params))
23 for p in jobs:
24 p.join()
25 status_function.append(p.exitcode)
26 # collect pbs run status
27 for sf in status_files:
28 with open(sf) as f:
29 status_command.append(f.read().strip())
30 status = {'function': status_function, 'command': status_command}
31 return status
32
33
34 def pbs_send_job(cmd, status_file, qsub_params):
35 ''' send job to pbs cluster, require status file'''
36 p = multiprocessing.Process(target=pbs_run,
37 args=(cmd, status_file, qsub_params))
38 p.start()
39 return p
40
41
42 def pbs_run(cmd, status_file, qsub_params):
43 '''
44 run shell command cmd on pbs cluster, wait for job to finish
45 and return status
46 '''
47 print(status_file)
48 error_file = status_file + ".e"
49 # test if writable
50 try:
51 f = open(status_file, 'w').close()
52 f = open(error_file, 'w').close()
53 except IOError:
54 print("cannot write to status files, make sure path exists")
55 raise IOError
56
57 if os.path.exists(status_file):
58 print("removing old status file")
59 os.remove(status_file)
60 cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\""
61 " > {status_file}' | qsub -e {err}"
62 " {qsub_params} ").format(cmd=cmd, status_file=status_file,
63 err=error_file,
64 qsub_params=qsub_params)
65 os.system(cmd_full)
66
67 while True:
68 if os.path.exists(status_file):
69 break
70 else:
71 time.sleep(3)
72 with open(status_file) as f:
73 status = f.read().strip()
74 return status
75
76
77 def spawn(f):
78 def fun(pipe, x):
79 pipe.send(f(x))
80 pipe.close()
81 return fun
82
83
84 def get_max_proc():
85 '''Number of cpu to ise in ether get from config.py is available or
86 from global PROC or from environment variable PRCO or set to system max'''
87 try:
88 from config import PROC as max_proc
89 except ImportError:
90 if "PROC" in globals():
91 max_proc = PROC
92 elif "PROC" in os.environ:
93 max_proc = int(os.environ["PROC"])
94
95 else:
96 max_proc = multiprocessing.cpu_count()
97 return max_proc
98
99
100 def parmap2(f, X, groups, ppn):
101 max_proc = get_max_proc()
102 print("running in parallel using ", max_proc, "cpu(s)")
103 process_pool = []
104 output = [None] * len(X)
105 # prepare processes
106 for x, index in zip(X, list(range(len(X)))):
107 # status:
108 # 0: waiting, 1: running, 2:collected
109 process_pool.append({
110 'status': 0,
111 'proc': None,
112 'pipe': None,
113 'index': index,
114 'group': groups[index],
115 'ppn': ppn[index]
116
117 })
118
119 # run processes
120 running = 0
121 finished = 0
122 sleep_time = 0.001
123 while True:
124 # count alive processes
125 if not sleep_time:
126 sleep_time = 0.001
127 for i in process_pool:
128 if i['status'] == 1 and not (i['proc'].exitcode is None):
129 sleep_time = 0.0
130 # was running now finished --> collect
131 i['status'] = 2
132 running -= 1
133 finished += 1
134 output[i['index']] = collect(i['proc'], i['pipe'])
135 del i['pipe']
136 del i['proc']
137 if i['status'] == 0 and running < max_proc:
138 # waiting and free --> run
139 # check if this group can be run
140 running_groups = [pp['group']
141 for pp in process_pool if pp['status'] == 1]
142 # check max load of concurent runs:
143 current_load = sum([pp['ppn']
144 for pp in process_pool if pp['status'] == 1])
145 cond1 = (i['ppn'] + current_load) <= 1
146 cond2 = not i['group'] in running_groups
147 if cond1 and cond2:
148 sleep_time = 0.0
149 try:
150 i['pipe'] = multiprocessing.Pipe()
151 except OSError as e:
152 print('exception occured:',e)
153 continue
154 i['proc'] = multiprocessing.Process(
155 target=spawn(f),
156 args=(i['pipe'][1], X[i['index']]),
157 name=str(i['index']))
158 i['proc'].start()
159 i['status'] = 1
160 running += 1
161 if finished == len(process_pool):
162 break
163 if sleep_time:
164 # sleep only if nothing changed in the last cycle
165 time.sleep(sleep_time)
166 # sleep time gradually increase to 1 sec
167 sleep_time = min(2 * sleep_time, 1)
168 return output
169
170
171 def print_status(pp):
172 states = ['waiting', 'running', 'collected']
173 print("___________________________________")
174 print("jobid status group ppn exitcode")
175 print("===================================")
176 for i in pp:
177 print(
178 i['index'], " ",
179 states[i['status']], " ",
180 i['group'], " ",
181 i['ppn'], " ",
182 i['proc'].exitcode
183 )
184
185
186 def collect(pf, pp):
187 if pf.pid and not pf.exitcode and not pf.is_alive():
188 returnvalue = pp[0].recv()
189 pf.join()
190 pp[0].close()
191 pp[1].close()
192 return returnvalue
193 elif pf.exitcode:
194 print("job finished with exit code {}".format(pf.exitcode))
195 pf.join()
196 pp[0].close()
197 pp[1].close()
198 return None
199 # return None
200 else:
201 raise Exception('not collected')
202
203
204 def parmap(f, X):
205
206 max_proc = get_max_proc()
207
208 pipe = []
209 proc = []
210 returnvalue = {}
211
212 for x, index in zip(X, list(range(len(X)))):
213 pipe.append(multiprocessing.Pipe())
214 proc.append(multiprocessing.Process(target=spawn(f),
215 args=(pipe[-1][1], x), name=str(index)))
216 p = proc[-1]
217 # count alive processes
218 while True:
219 running = 0
220 for i in proc:
221 if i.is_alive():
222 running += 1
223 # print "running:"+str(running)
224 if running < max_proc:
225 break
226 else:
227 time.sleep(0.1)
228 p.start()
229 # print "process started:"+str(p.pid)
230 # check for finished
231
232 for pf, pp, index in zip(proc, pipe, range(len(pipe))):
233 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
234 pf.join()
235 returnvalue[str(pf.name)] = pp[0].recv()
236 pp[0].close()
237 pp[1].close()
238 # proc must be garbage collected - to free all file connection
239 del proc[index]
240 del pipe[index]
241
242 # collect the rest:
243 [pf.join() for pf in proc]
244 for pf, pp in zip(proc, pipe):
245 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
246 returnvalue[str(pf.name)] = pp[0].recv()
247 pp[0].close()
248 pp[1].close()
249 # convert to list in input correct order
250 returnvalue = [returnvalue[str(i)] for i in range(len(X))]
251 return returnvalue
252
253
254 def parallel2(command, *args, groups=None, ppn=None):
255 ''' same as parallel but groups are used to identifie mutually
256 exclusive jobs, jobs with the same goup id are never run together
257 ppn params is 'load' of the job - sum of loads cannot exceed 1
258 '''
259 # check args, expand if necessary
260 args = list(args)
261 N = [len(i) for i in args] # lengths of lists
262 Mx = max(N)
263 if len(set(N)) == 1:
264 # all good
265 pass
266 elif set(N) == set([1, Mx]):
267 # expand args of length 1
268 for i in range(len(args)):
269 if len(args[i]) == 1:
270 args[i] = args[i] * Mx
271 else:
272 raise ValueError
273 if not groups:
274 groups = range(Mx)
275 elif len(groups) != Mx:
276 print("length of groups must be same as number of job or None")
277 raise ValueError
278
279 if not ppn:
280 ppn = [0] * Mx
281 elif len(ppn) != Mx:
282 print("length of ppn must be same as number of job or None")
283 raise ValueError
284 elif max(ppn) > 1 and min(ppn):
285 print("ppn values must be in 0 - 1 range")
286 raise ValueError
287 # convert argument to suitable format - 'transpose'
288 argsTuples = list(zip(*args))
289 args = [list(i) for i in argsTuples]
290
291 # multiprocessing.Pool()
292
293 def command_star(args):
294 return(command(*args))
295
296 x = parmap2(command_star, argsTuples, groups, ppn)
297 return x
298
299
300 def parallel(command, *args):
301 ''' Execute command in parallel using multiprocessing
302 command is the function to be executed
303 args is list of list of arguments
304 execution is :
305 command(args[0][0],args[1][0],args[2][0],args[3][0],....)
306 command(args[0][1],args[1][1],args[2][1],args[3][1],....)
307 command(args[0][2],args[1][2],args[2][2],args[3][2],....)
308 ...
309 output of command is returned as list
310 '''
311 # check args, expand if necessary
312 args = list(args)
313 N = [len(i) for i in args] # lengths of lists
314 Mx = max(N)
315 if len(set(N)) == 1:
316 # all good
317 pass
318 elif set(N) == set([1, Mx]):
319 # expand args of length 1
320 for i in range(len(args)):
321 if len(args[i]) == 1:
322 args[i] = args[i] * Mx
323 else:
324 raise ValueError
325
326 # convert argument to suitable format - 'transpose'
327 argsTuples = list(zip(*args))
328 args = [list(i) for i in argsTuples]
329
330 multiprocessing.Pool()
331
332 def command_star(args):
333 return(command(*args))
334
335 x = parmap(command_star, argsTuples)
336 return x
337
338
339 def worker(*a):
340 x = 0
341 y = 0
342 for i in a:
343 if i == 1.1:
344 print("raising exception")
345 s = 1 / 0
346 y += i
347 for j in range(10):
348 x += i
349 for j in range(100000):
350 x = 1.0 / (float(j) + 1.0)
351 return(y)
352
353 # test
354 if __name__ == "__main__":
355 # x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [
356 # 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2])
357
358 x = parallel2(
359 worker, [1], [2], [3], [4], [1],
360 [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50],
361 [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2],
362 groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
363 ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4,
364 0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1]
365 )
366 print(x)