comparison env/lib/python3.9/site-packages/boto/manage/task.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22
23 import boto
24 from boto.sdb.db.property import StringProperty, DateTimeProperty, IntegerProperty
25 from boto.sdb.db.model import Model
26 import datetime, subprocess, time
27 from boto.compat import StringIO
28
29 def check_hour(val):
30 if val == '*':
31 return
32 if int(val) < 0 or int(val) > 23:
33 raise ValueError
34
35 class Task(Model):
36
37 """
38 A scheduled, repeating task that can be executed by any participating servers.
39 The scheduling is similar to cron jobs. Each task has an hour attribute.
40 The allowable values for hour are [0-23|*].
41
42 To keep the operation reasonably efficient and not cause excessive polling,
43 the minimum granularity of a Task is hourly. Some examples:
44
45 hour='*' - the task would be executed each hour
46 hour='3' - the task would be executed at 3AM GMT each day.
47
48 """
49 name = StringProperty()
50 hour = StringProperty(required=True, validator=check_hour, default='*')
51 command = StringProperty(required=True)
52 last_executed = DateTimeProperty()
53 last_status = IntegerProperty()
54 last_output = StringProperty()
55 message_id = StringProperty()
56
57 @classmethod
58 def start_all(cls, queue_name):
59 for task in cls.all():
60 task.start(queue_name)
61
62 def __init__(self, id=None, **kw):
63 super(Task, self).__init__(id, **kw)
64 self.hourly = self.hour == '*'
65 self.daily = self.hour != '*'
66 self.now = datetime.datetime.utcnow()
67
68 def check(self):
69 """
70 Determine how long until the next scheduled time for a Task.
71 Returns the number of seconds until the next scheduled time or zero
72 if the task needs to be run immediately.
73 If it's an hourly task and it's never been run, run it now.
74 If it's a daily task and it's never been run and the hour is right, run it now.
75 """
76 boto.log.info('checking Task[%s]-now=%s, last=%s' % (self.name, self.now, self.last_executed))
77
78 if self.hourly and not self.last_executed:
79 return 0
80
81 if self.daily and not self.last_executed:
82 if int(self.hour) == self.now.hour:
83 return 0
84 else:
85 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60
86
87 delta = self.now - self.last_executed
88 if self.hourly:
89 if delta.seconds >= 60*60:
90 return 0
91 else:
92 return 60*60 - delta.seconds
93 else:
94 if int(self.hour) == self.now.hour:
95 if delta.days >= 1:
96 return 0
97 else:
98 return 82800 # 23 hours, just to be safe
99 else:
100 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60
101
102 def _run(self, msg, vtimeout):
103 boto.log.info('Task[%s] - running:%s' % (self.name, self.command))
104 log_fp = StringIO()
105 process = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE,
106 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
107 nsecs = 5
108 current_timeout = vtimeout
109 while process.poll() is None:
110 boto.log.info('nsecs=%s, timeout=%s' % (nsecs, current_timeout))
111 if nsecs >= current_timeout:
112 current_timeout += vtimeout
113 boto.log.info('Task[%s] - setting timeout to %d seconds' % (self.name, current_timeout))
114 if msg:
115 msg.change_visibility(current_timeout)
116 time.sleep(5)
117 nsecs += 5
118 t = process.communicate()
119 log_fp.write(t[0])
120 log_fp.write(t[1])
121 boto.log.info('Task[%s] - output: %s' % (self.name, log_fp.getvalue()))
122 self.last_executed = self.now
123 self.last_status = process.returncode
124 self.last_output = log_fp.getvalue()[0:1023]
125
126 def run(self, msg, vtimeout=60):
127 delay = self.check()
128 boto.log.info('Task[%s] - delay=%s seconds' % (self.name, delay))
129 if delay == 0:
130 self._run(msg, vtimeout)
131 queue = msg.queue
132 new_msg = queue.new_message(self.id)
133 new_msg = queue.write(new_msg)
134 self.message_id = new_msg.id
135 self.put()
136 boto.log.info('Task[%s] - new message id=%s' % (self.name, new_msg.id))
137 msg.delete()
138 boto.log.info('Task[%s] - deleted message %s' % (self.name, msg.id))
139 else:
140 boto.log.info('new_vtimeout: %d' % delay)
141 msg.change_visibility(delay)
142
143 def start(self, queue_name):
144 boto.log.info('Task[%s] - starting with queue: %s' % (self.name, queue_name))
145 queue = boto.lookup('sqs', queue_name)
146 msg = queue.new_message(self.id)
147 msg = queue.write(msg)
148 self.message_id = msg.id
149 self.put()
150 boto.log.info('Task[%s] - start successful' % self.name)
151
152 class TaskPoller(object):
153
154 def __init__(self, queue_name):
155 self.sqs = boto.connect_sqs()
156 self.queue = self.sqs.lookup(queue_name)
157
158 def poll(self, wait=60, vtimeout=60):
159 while True:
160 m = self.queue.read(vtimeout)
161 if m:
162 task = Task.get_by_id(m.get_body())
163 if task:
164 if not task.message_id or m.id == task.message_id:
165 boto.log.info('Task[%s] - read message %s' % (task.name, m.id))
166 task.run(m, vtimeout)
167 else:
168 boto.log.info('Task[%s] - found extraneous message, ignoring' % task.name)
169 else:
170 time.sleep(wait)
171
172
173
174
175
176