Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/mutation.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 from collections import namedtuple | |
| 4 from typing import Any, Dict | |
| 5 | |
| 6 from typing_extensions import Text # pylint: disable=unused-import | |
| 7 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 8 | |
| 9 from .errors import WorkflowException | |
| 10 | |
| 11 | |
| 12 MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) | |
| 13 | |
| 14 _generation = "http://commonwl.org/cwltool#generation" | |
| 15 | |
| 16 class MutationManager(object): | |
| 17 """Lock manager for checking correctness of in-place update of files. | |
| 18 | |
| 19 Used to validate that in-place file updates happen sequentially, and that a | |
| 20 file which is registered for in-place update cannot be read or updated by | |
| 21 any other steps. | |
| 22 | |
| 23 """ | |
| 24 | |
| 25 def __init__(self): # type: () -> None | |
| 26 """Initialize.""" | |
| 27 self.generations = {} # type: Dict[Text, MutationState] | |
| 28 | |
| 29 def register_reader(self, stepname, obj): | |
| 30 # type: (Text, Dict[Text, Any]) -> None | |
| 31 loc = obj["location"] | |
| 32 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 33 obj_generation = obj.get(_generation, 0) | |
| 34 | |
| 35 if obj_generation != current.generation: | |
| 36 raise WorkflowException( | |
| 37 "[job {}] wants to read {} from generation {} but current " | |
| 38 "generation is {}(last updated by {})".format( | |
| 39 stepname, loc, obj_generation, current.generation, current.stepname)) | |
| 40 | |
| 41 current.readers.append(stepname) | |
| 42 self.generations[loc] = current | |
| 43 | |
| 44 def release_reader(self, stepname, obj): | |
| 45 # type: (Text, Dict[Text, Any]) -> None | |
| 46 loc = obj["location"] | |
| 47 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 48 obj_generation = obj.get(_generation, 0) | |
| 49 | |
| 50 if obj_generation != current.generation: | |
| 51 raise WorkflowException( | |
| 52 "[job {}] wants to release reader on {} from generation {}" | |
| 53 " but current generation is {} (last updated by {})".format( | |
| 54 stepname, loc, obj_generation, current.generation, | |
| 55 current.stepname)) | |
| 56 | |
| 57 self.generations[loc].readers.remove(stepname) | |
| 58 | |
| 59 def register_mutation(self, stepname, obj): | |
| 60 # type: (Text, Dict[Text, Any]) -> None | |
| 61 loc = obj["location"] | |
| 62 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 63 obj_generation = obj.get(_generation, 0) | |
| 64 | |
| 65 if len(current.readers) > 0: | |
| 66 raise WorkflowException( | |
| 67 "[job {}] wants to modify {} but has readers: {}".format( | |
| 68 stepname, loc, current.readers)) | |
| 69 | |
| 70 if obj_generation != current.generation: | |
| 71 raise WorkflowException( | |
| 72 "[job {}] wants to modify {} from generation {} but current " | |
| 73 "generation is {} (last updated by {})".format( | |
| 74 stepname, loc, obj_generation, current.generation, current.stepname)) | |
| 75 | |
| 76 self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) | |
| 77 | |
| 78 def set_generation(self, obj): # type: (Dict[Text, Text]) -> None | |
| 79 loc = obj["location"] | |
| 80 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 81 obj[_generation] = current.generation | |
| 82 | |
| 83 def unset_generation(self, obj): # type: (Dict[Text, Text]) -> None | |
| 84 obj.pop(_generation, None) |
