comparison env/lib/python3.9/site-packages/cwltool/mutation.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 from collections import namedtuple
2 from typing import Dict, cast
3
4 from .errors import WorkflowException
5 from .utils import CWLObjectType
6
7 MutationState = namedtuple("MutationState", ["generation", "readers", "stepname"])
8
9 _generation = "http://commonwl.org/cwltool#generation"
10
11
12 class MutationManager:
13 """Lock manager for checking correctness of in-place update of files.
14
15 Used to validate that in-place file updates happen sequentially, and that a
16 file which is registered for in-place update cannot be read or updated by
17 any other steps.
18
19 """
20
21 def __init__(self) -> None:
22 """Initialize."""
23 self.generations = {} # type: Dict[str, MutationState]
24
25 def register_reader(self, stepname: str, obj: CWLObjectType) -> None:
26 loc = cast(str, obj["location"])
27 current = self.generations.get(loc, MutationState(0, [], ""))
28 obj_generation = obj.get(_generation, 0)
29
30 if obj_generation != current.generation:
31 raise WorkflowException(
32 "[job {}] wants to read {} from generation {} but current "
33 "generation is {}(last updated by {})".format(
34 stepname, loc, obj_generation, current.generation, current.stepname
35 )
36 )
37
38 current.readers.append(stepname)
39 self.generations[loc] = current
40
41 def release_reader(self, stepname: str, obj: CWLObjectType) -> None:
42 loc = cast(str, obj["location"])
43 current = self.generations.get(loc, MutationState(0, [], ""))
44 obj_generation = obj.get(_generation, 0)
45
46 if obj_generation != current.generation:
47 raise WorkflowException(
48 "[job {}] wants to release reader on {} from generation {}"
49 " but current generation is {} (last updated by {})".format(
50 stepname, loc, obj_generation, current.generation, current.stepname
51 )
52 )
53
54 self.generations[loc].readers.remove(stepname)
55
56 def register_mutation(self, stepname: str, obj: CWLObjectType) -> None:
57 loc = cast(str, obj["location"])
58 current = self.generations.get(loc, MutationState(0, [], ""))
59 obj_generation = obj.get(_generation, 0)
60
61 if len(current.readers) > 0:
62 raise WorkflowException(
63 "[job {}] wants to modify {} but has readers: {}".format(
64 stepname, loc, current.readers
65 )
66 )
67
68 if obj_generation != current.generation:
69 raise WorkflowException(
70 "[job {}] wants to modify {} from generation {} but current "
71 "generation is {} (last updated by {})".format(
72 stepname, loc, obj_generation, current.generation, current.stepname
73 )
74 )
75
76 self.generations[loc] = MutationState(
77 current.generation + 1, current.readers, stepname
78 )
79
80 def set_generation(self, obj: CWLObjectType) -> None:
81 loc = cast(str, obj["location"])
82 current = self.generations.get(loc, MutationState(0, [], ""))
83 obj[_generation] = current.generation
84
85 def unset_generation(self, obj: CWLObjectType) -> None:
86 obj.pop(_generation, None)