Mercurial > repos > guerler > springsuite
view planemo/lib/python3.7/site-packages/boto/manage/task.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | d30785e31577 |
children |
line wrap: on
line source
# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/ # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # import boto from boto.sdb.db.property import StringProperty, DateTimeProperty, IntegerProperty from boto.sdb.db.model import Model import datetime, subprocess, time from boto.compat import StringIO def check_hour(val): if val == '*': return if int(val) < 0 or int(val) > 23: raise ValueError class Task(Model): """ A scheduled, repeating task that can be executed by any participating servers. The scheduling is similar to cron jobs. Each task has an hour attribute. The allowable values for hour are [0-23|*]. To keep the operation reasonably efficient and not cause excessive polling, the minimum granularity of a Task is hourly. Some examples: hour='*' - the task would be executed each hour hour='3' - the task would be executed at 3AM GMT each day. """ name = StringProperty() hour = StringProperty(required=True, validator=check_hour, default='*') command = StringProperty(required=True) last_executed = DateTimeProperty() last_status = IntegerProperty() last_output = StringProperty() message_id = StringProperty() @classmethod def start_all(cls, queue_name): for task in cls.all(): task.start(queue_name) def __init__(self, id=None, **kw): super(Task, self).__init__(id, **kw) self.hourly = self.hour == '*' self.daily = self.hour != '*' self.now = datetime.datetime.utcnow() def check(self): """ Determine how long until the next scheduled time for a Task. Returns the number of seconds until the next scheduled time or zero if the task needs to be run immediately. If it's an hourly task and it's never been run, run it now. If it's a daily task and it's never been run and the hour is right, run it now. """ boto.log.info('checking Task[%s]-now=%s, last=%s' % (self.name, self.now, self.last_executed)) if self.hourly and not self.last_executed: return 0 if self.daily and not self.last_executed: if int(self.hour) == self.now.hour: return 0 else: return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60 delta = self.now - self.last_executed if self.hourly: if delta.seconds >= 60*60: return 0 else: return 60*60 - delta.seconds else: if int(self.hour) == self.now.hour: if delta.days >= 1: return 0 else: return 82800 # 23 hours, just to be safe else: return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60 def _run(self, msg, vtimeout): boto.log.info('Task[%s] - running:%s' % (self.name, self.command)) log_fp = StringIO() process = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) nsecs = 5 current_timeout = vtimeout while process.poll() is None: boto.log.info('nsecs=%s, timeout=%s' % (nsecs, current_timeout)) if nsecs >= current_timeout: current_timeout += vtimeout boto.log.info('Task[%s] - setting timeout to %d seconds' % (self.name, current_timeout)) if msg: msg.change_visibility(current_timeout) time.sleep(5) nsecs += 5 t = process.communicate() log_fp.write(t[0]) log_fp.write(t[1]) boto.log.info('Task[%s] - output: %s' % (self.name, log_fp.getvalue())) self.last_executed = self.now self.last_status = process.returncode self.last_output = log_fp.getvalue()[0:1023] def run(self, msg, vtimeout=60): delay = self.check() boto.log.info('Task[%s] - delay=%s seconds' % (self.name, delay)) if delay == 0: self._run(msg, vtimeout) queue = msg.queue new_msg = queue.new_message(self.id) new_msg = queue.write(new_msg) self.message_id = new_msg.id self.put() boto.log.info('Task[%s] - new message id=%s' % (self.name, new_msg.id)) msg.delete() boto.log.info('Task[%s] - deleted message %s' % (self.name, msg.id)) else: boto.log.info('new_vtimeout: %d' % delay) msg.change_visibility(delay) def start(self, queue_name): boto.log.info('Task[%s] - starting with queue: %s' % (self.name, queue_name)) queue = boto.lookup('sqs', queue_name) msg = queue.new_message(self.id) msg = queue.write(msg) self.message_id = msg.id self.put() boto.log.info('Task[%s] - start successful' % self.name) class TaskPoller(object): def __init__(self, queue_name): self.sqs = boto.connect_sqs() self.queue = self.sqs.lookup(queue_name) def poll(self, wait=60, vtimeout=60): while True: m = self.queue.read(vtimeout) if m: task = Task.get_by_id(m.get_body()) if task: if not task.message_id or m.id == task.message_id: boto.log.info('Task[%s] - read message %s' % (task.name, m.id)) task.run(m, vtimeout) else: boto.log.info('Task[%s] - found extraneous message, ignoring' % task.name) else: time.sleep(wait)