Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/mutation.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 from collections import namedtuple | |
| 2 from typing import Dict, cast | |
| 3 | |
| 4 from .errors import WorkflowException | |
| 5 from .utils import CWLObjectType | |
| 6 | |
| 7 MutationState = namedtuple("MutationState", ["generation", "readers", "stepname"]) | |
| 8 | |
| 9 _generation = "http://commonwl.org/cwltool#generation" | |
| 10 | |
| 11 | |
| 12 class MutationManager: | |
| 13 """Lock manager for checking correctness of in-place update of files. | |
| 14 | |
| 15 Used to validate that in-place file updates happen sequentially, and that a | |
| 16 file which is registered for in-place update cannot be read or updated by | |
| 17 any other steps. | |
| 18 | |
| 19 """ | |
| 20 | |
| 21 def __init__(self) -> None: | |
| 22 """Initialize.""" | |
| 23 self.generations = {} # type: Dict[str, MutationState] | |
| 24 | |
| 25 def register_reader(self, stepname: str, obj: CWLObjectType) -> None: | |
| 26 loc = cast(str, obj["location"]) | |
| 27 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 28 obj_generation = obj.get(_generation, 0) | |
| 29 | |
| 30 if obj_generation != current.generation: | |
| 31 raise WorkflowException( | |
| 32 "[job {}] wants to read {} from generation {} but current " | |
| 33 "generation is {}(last updated by {})".format( | |
| 34 stepname, loc, obj_generation, current.generation, current.stepname | |
| 35 ) | |
| 36 ) | |
| 37 | |
| 38 current.readers.append(stepname) | |
| 39 self.generations[loc] = current | |
| 40 | |
| 41 def release_reader(self, stepname: str, obj: CWLObjectType) -> None: | |
| 42 loc = cast(str, obj["location"]) | |
| 43 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 44 obj_generation = obj.get(_generation, 0) | |
| 45 | |
| 46 if obj_generation != current.generation: | |
| 47 raise WorkflowException( | |
| 48 "[job {}] wants to release reader on {} from generation {}" | |
| 49 " but current generation is {} (last updated by {})".format( | |
| 50 stepname, loc, obj_generation, current.generation, current.stepname | |
| 51 ) | |
| 52 ) | |
| 53 | |
| 54 self.generations[loc].readers.remove(stepname) | |
| 55 | |
| 56 def register_mutation(self, stepname: str, obj: CWLObjectType) -> None: | |
| 57 loc = cast(str, obj["location"]) | |
| 58 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 59 obj_generation = obj.get(_generation, 0) | |
| 60 | |
| 61 if len(current.readers) > 0: | |
| 62 raise WorkflowException( | |
| 63 "[job {}] wants to modify {} but has readers: {}".format( | |
| 64 stepname, loc, current.readers | |
| 65 ) | |
| 66 ) | |
| 67 | |
| 68 if obj_generation != current.generation: | |
| 69 raise WorkflowException( | |
| 70 "[job {}] wants to modify {} from generation {} but current " | |
| 71 "generation is {} (last updated by {})".format( | |
| 72 stepname, loc, obj_generation, current.generation, current.stepname | |
| 73 ) | |
| 74 ) | |
| 75 | |
| 76 self.generations[loc] = MutationState( | |
| 77 current.generation + 1, current.readers, stepname | |
| 78 ) | |
| 79 | |
| 80 def set_generation(self, obj: CWLObjectType) -> None: | |
| 81 loc = cast(str, obj["location"]) | |
| 82 current = self.generations.get(loc, MutationState(0, [], "")) | |
| 83 obj[_generation] = current.generation | |
| 84 | |
| 85 def unset_generation(self, obj: CWLObjectType) -> None: | |
| 86 obj.pop(_generation, None) |
