annotate parallel.py @ 16:5376e1c9adec draft

Uploaded
author petr-novak
date Fri, 24 Apr 2020 08:54:30 -0400
parents a4cd8608ef6b
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
1 #!/usr/bin/env python3
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
2 import multiprocessing
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
3 import os
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
4 import time
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
5 from itertools import cycle
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
6 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
7 functions for parallel processing of data chunks using worker function
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
8 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
9
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
10
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
12 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
13 Example of pbs_params:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
14 -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
15 -l walltime=150:00:00,nodes=1:ppn=1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
16
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
17 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
18 jobs = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
19 status_function = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
20 status_command = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
21 for cmd, sf in zip(cmds, status_files):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
22 jobs.append(pbs_send_job(cmd, sf, qsub_params))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
23 for p in jobs:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
24 p.join()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
25 status_function.append(p.exitcode)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
26 # collect pbs run status
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
27 for sf in status_files:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
28 with open(sf) as f:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
29 status_command.append(f.read().strip())
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
30 status = {'function': status_function, 'command': status_command}
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
31 return status
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
32
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
33
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
34 def pbs_send_job(cmd, status_file, qsub_params):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
35 ''' send job to pbs cluster, require status file'''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
36 p = multiprocessing.Process(target=pbs_run,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
37 args=(cmd, status_file, qsub_params))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
38 p.start()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
39 return p
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
40
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
41
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
42 def pbs_run(cmd, status_file, qsub_params):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
43 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
44 run shell command cmd on pbs cluster, wait for job to finish
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
45 and return status
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
46 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
47 print(status_file)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
48 error_file = status_file + ".e"
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
49 # test if writable
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
50 try:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
51 f = open(status_file, 'w').close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
52 f = open(error_file, 'w').close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
53 except IOError:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
54 print("cannot write to status files, make sure path exists")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
55 raise IOError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
56
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
57 if os.path.exists(status_file):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
58 print("removing old status file")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
59 os.remove(status_file)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
60 cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\""
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
61 " > {status_file}' | qsub -e {err}"
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
62 " {qsub_params} ").format(cmd=cmd, status_file=status_file,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
63 err=error_file,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
64 qsub_params=qsub_params)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
65 os.system(cmd_full)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
66
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
67 while True:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
68 if os.path.exists(status_file):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
69 break
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
70 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
71 time.sleep(3)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
72 with open(status_file) as f:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
73 status = f.read().strip()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
74 return status
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
75
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
76
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
77 def spawn(f):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
78 def fun(pipe, x):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
79 pipe.send(f(x))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
80 pipe.close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
81 return fun
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
82
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
83
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
84 def get_max_proc():
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
85 '''Number of cpu to ise in ether get from config.py is available or
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
86 from global PROC or from environment variable PRCO or set to system max'''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
87 try:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
88 from config import PROC as max_proc
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
89 except ImportError:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
90 if "PROC" in globals():
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
91 max_proc = PROC
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
92 elif "PROC" in os.environ:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
93 max_proc = int(os.environ["PROC"])
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
94
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
95 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
96 max_proc = multiprocessing.cpu_count()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
97 return max_proc
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
98
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
99
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
100 def parmap2(f, X, groups, ppn):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
101 max_proc = get_max_proc()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
102 print("running in parallel using ", max_proc, "cpu(s)")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
103 process_pool = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
104 output = [None] * len(X)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
105 # prepare processes
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
106 for x, index in zip(X, list(range(len(X)))):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
107 # status:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
108 # 0: waiting, 1: running, 2:collected
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
109 process_pool.append({
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
110 'status': 0,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
111 'proc': None,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
112 'pipe': None,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
113 'index': index,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
114 'group': groups[index],
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
115 'ppn': ppn[index]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
116
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
117 })
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
118
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
119 # run processes
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
120 running = 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
121 finished = 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
122 sleep_time = 0.001
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
123 while True:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
124 # count alive processes
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
125 if not sleep_time:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
126 sleep_time = 0.001
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
127 for i in process_pool:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
128 if i['status'] == 1 and not (i['proc'].exitcode is None):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
129 sleep_time = 0.0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
130 # was running now finished --> collect
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
131 i['status'] = 2
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
132 running -= 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
133 finished += 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
134 output[i['index']] = collect(i['proc'], i['pipe'])
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
135 del i['pipe']
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
136 del i['proc']
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
137 if i['status'] == 0 and running < max_proc:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
138 # waiting and free --> run
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
139 # check if this group can be run
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
140 running_groups = [pp['group']
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
141 for pp in process_pool if pp['status'] == 1]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
142 # check max load of concurent runs:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
143 current_load = sum([pp['ppn']
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
144 for pp in process_pool if pp['status'] == 1])
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
145 cond1 = (i['ppn'] + current_load) <= 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
146 cond2 = not i['group'] in running_groups
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
147 if cond1 and cond2:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
148 sleep_time = 0.0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
149 try:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
150 i['pipe'] = multiprocessing.Pipe()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
151 except OSError as e:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
152 print('exception occured:',e)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
153 continue
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
154 i['proc'] = multiprocessing.Process(
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
155 target=spawn(f),
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
156 args=(i['pipe'][1], X[i['index']]),
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
157 name=str(i['index']))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
158 i['proc'].start()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
159 i['status'] = 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
160 running += 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
161 if finished == len(process_pool):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
162 break
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
163 if sleep_time:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
164 # sleep only if nothing changed in the last cycle
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
165 time.sleep(sleep_time)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
166 # sleep time gradually increase to 1 sec
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
167 sleep_time = min(2 * sleep_time, 1)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
168 return output
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
169
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
170
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
171 def print_status(pp):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
172 states = ['waiting', 'running', 'collected']
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
173 print("___________________________________")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
174 print("jobid status group ppn exitcode")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
175 print("===================================")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
176 for i in pp:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
177 print(
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
178 i['index'], " ",
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
179 states[i['status']], " ",
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
180 i['group'], " ",
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
181 i['ppn'], " ",
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
182 i['proc'].exitcode
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
183 )
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
184
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
185
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
186 def collect(pf, pp):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
187 if pf.pid and not pf.exitcode and not pf.is_alive():
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
188 returnvalue = pp[0].recv()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
189 pf.join()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
190 pp[0].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
191 pp[1].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
192 return returnvalue
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
193 elif pf.exitcode:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
194 print("job finished with exit code {}".format(pf.exitcode))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
195 pf.join()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
196 pp[0].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
197 pp[1].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
198 return None
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
199 # return None
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
200 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
201 raise Exception('not collected')
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
202
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
203
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
204 def parmap(f, X):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
205
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
206 max_proc = get_max_proc()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
207
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
208 pipe = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
209 proc = []
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
210 returnvalue = {}
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
211
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
212 for x, index in zip(X, list(range(len(X)))):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
213 pipe.append(multiprocessing.Pipe())
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
214 proc.append(multiprocessing.Process(target=spawn(f),
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
215 args=(pipe[-1][1], x), name=str(index)))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
216 p = proc[-1]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
217 # count alive processes
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
218 while True:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
219 running = 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
220 for i in proc:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
221 if i.is_alive():
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
222 running += 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
223 # print "running:"+str(running)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
224 if running < max_proc:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
225 break
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
226 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
227 time.sleep(0.1)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
228 p.start()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
229 # print "process started:"+str(p.pid)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
230 # check for finished
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
231
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
232 for pf, pp, index in zip(proc, pipe, range(len(pipe))):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
233 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
234 pf.join()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
235 returnvalue[str(pf.name)] = pp[0].recv()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
236 pp[0].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
237 pp[1].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
238 # proc must be garbage collected - to free all file connection
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
239 del proc[index]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
240 del pipe[index]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
241
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
242 # collect the rest:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
243 [pf.join() for pf in proc]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
244 for pf, pp in zip(proc, pipe):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
245 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
246 returnvalue[str(pf.name)] = pp[0].recv()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
247 pp[0].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
248 pp[1].close()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
249 # convert to list in input correct order
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
250 returnvalue = [returnvalue[str(i)] for i in range(len(X))]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
251 return returnvalue
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
252
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
253
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
254 def parallel2(command, *args, groups=None, ppn=None):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
255 ''' same as parallel but groups are used to identifie mutually
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
256 exclusive jobs, jobs with the same goup id are never run together
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
257 ppn params is 'load' of the job - sum of loads cannot exceed 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
258 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
259 # check args, expand if necessary
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
260 args = list(args)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
261 N = [len(i) for i in args] # lengths of lists
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
262 Mx = max(N)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
263 if len(set(N)) == 1:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
264 # all good
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
265 pass
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
266 elif set(N) == set([1, Mx]):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
267 # expand args of length 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
268 for i in range(len(args)):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
269 if len(args[i]) == 1:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
270 args[i] = args[i] * Mx
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
271 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
272 raise ValueError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
273 if not groups:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
274 groups = range(Mx)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
275 elif len(groups) != Mx:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
276 print("length of groups must be same as number of job or None")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
277 raise ValueError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
278
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
279 if not ppn:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
280 ppn = [0] * Mx
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
281 elif len(ppn) != Mx:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
282 print("length of ppn must be same as number of job or None")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
283 raise ValueError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
284 elif max(ppn) > 1 and min(ppn):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
285 print("ppn values must be in 0 - 1 range")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
286 raise ValueError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
287 # convert argument to suitable format - 'transpose'
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
288 argsTuples = list(zip(*args))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
289 args = [list(i) for i in argsTuples]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
290
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
291 # multiprocessing.Pool()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
292
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
293 def command_star(args):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
294 return(command(*args))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
295
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
296 x = parmap2(command_star, argsTuples, groups, ppn)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
297 return x
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
298
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
299
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
300 def parallel(command, *args):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
301 ''' Execute command in parallel using multiprocessing
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
302 command is the function to be executed
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
303 args is list of list of arguments
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
304 execution is :
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
305 command(args[0][0],args[1][0],args[2][0],args[3][0],....)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
306 command(args[0][1],args[1][1],args[2][1],args[3][1],....)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
307 command(args[0][2],args[1][2],args[2][2],args[3][2],....)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
308 ...
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
309 output of command is returned as list
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
310 '''
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
311 # check args, expand if necessary
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
312 args = list(args)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
313 N = [len(i) for i in args] # lengths of lists
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
314 Mx = max(N)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
315 if len(set(N)) == 1:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
316 # all good
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
317 pass
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
318 elif set(N) == set([1, Mx]):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
319 # expand args of length 1
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
320 for i in range(len(args)):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
321 if len(args[i]) == 1:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
322 args[i] = args[i] * Mx
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
323 else:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
324 raise ValueError
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
325
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
326 # convert argument to suitable format - 'transpose'
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
327 argsTuples = list(zip(*args))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
328 args = [list(i) for i in argsTuples]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
329
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
330 multiprocessing.Pool()
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
331
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
332 def command_star(args):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
333 return(command(*args))
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
334
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
335 x = parmap(command_star, argsTuples)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
336 return x
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
337
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
338
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
339 def worker(*a):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
340 x = 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
341 y = 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
342 for i in a:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
343 if i == 1.1:
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
344 print("raising exception")
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
345 s = 1 / 0
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
346 y += i
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
347 for j in range(10):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
348 x += i
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
349 for j in range(100000):
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
350 x = 1.0 / (float(j) + 1.0)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
351 return(y)
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
352
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
353 # test
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
354 if __name__ == "__main__":
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
355 # x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
356 # 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2])
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
357
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
358 x = parallel2(
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
359 worker, [1], [2], [3], [4], [1],
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
360 [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50],
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
361 [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2],
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
362 groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
363 ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4,
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
364 0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1]
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
365 )
a4cd8608ef6b Uploaded
petr-novak
parents:
diff changeset
366 print(x)