comparison package_output.py @ 0:5c72425b7f1b draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 98a4dd44360447aa96d92143384d78e116d7581b
author richard-burhans
date Wed, 17 Apr 2024 18:06:54 +0000
parents
children 36cafb694dd2
comparison
equal deleted inserted replaced
-1:000000000000 0:5c72425b7f1b
1 #!/usr/bin/env python
2
3 import argparse
4 import configparser
5 import json
6 import os
7 import sys
8 import tarfile
9 import typing
10
11 import bashlex
12
13
14 class PackageFile:
15 def __init__(
16 self,
17 pathname: str = "data_package.tgz",
18 top_dir: str = "galaxy",
19 data_dir: str = "files",
20 config_file: str = "commands.json",
21 ) -> None:
22 self.pathname: str = os.path.realpath(pathname)
23 self.data_root: str = os.path.join(top_dir, data_dir)
24 self.config_path: str = os.path.join(top_dir, config_file)
25 self.config_file: str = config_file
26 self.tarfile: typing.Optional[tarfile.TarFile] = None
27 self.name_cache: typing.Dict[typing.Any, typing.Any] = {}
28 self.working_dir: str = os.path.realpath(os.getcwd())
29
30 def _initialize(self) -> None:
31 if self.tarfile is None:
32 self.tarfile = tarfile.open(
33 name=self.pathname,
34 mode="w:gz",
35 format=tarfile.GNU_FORMAT,
36 compresslevel=1,
37 )
38
39 def add_config(self, pathname: str) -> None:
40 if self.tarfile is None:
41 self._initialize()
42
43 source_path = os.path.realpath(pathname)
44
45 if self.tarfile is not None:
46 self.tarfile.add(source_path, arcname=self.config_path, recursive=False)
47
48 def add_file(self, pathname: str, arcname: typing.Optional[str] = None) -> None:
49 if self.tarfile is None:
50 self._initialize()
51
52 source_path = os.path.realpath(pathname)
53
54 dest_path = None
55
56 if arcname is None:
57 dest_path = os.path.join(self.data_root, os.path.basename(source_path))
58 else:
59 arc_path = os.path.realpath(arcname)
60 rel_path = os.path.relpath(arc_path, self.working_dir)
61 if not (os.path.isabs(rel_path) or rel_path.startswith("../")):
62 dest_path = os.path.join(self.data_root, rel_path)
63 else:
64 sys.exit("path fail")
65
66 if dest_path is not None:
67 if self.tarfile is not None:
68 if dest_path not in self.name_cache:
69 try:
70 self.tarfile.add(
71 source_path, arcname=dest_path, recursive=False
72 )
73 except FileNotFoundError:
74 sys.exit(f"missing source file {source_path}")
75
76 self.name_cache[dest_path] = 1
77 # print(f"added: {dest_path}", flush=True)
78
79 def close(self) -> None:
80 if self.tarfile is not None:
81 self.tarfile.close()
82 self.tarfile = None
83
84
85 class bashCommandLineFile:
86 def __init__(
87 self,
88 pathname: str,
89 config: configparser.ConfigParser,
90 package_file: PackageFile,
91 ) -> None:
92 self.pathname: str = pathname
93 self.config = config
94 self.package_file = package_file
95 self.executable: typing.Optional[str] = None
96 self._parse_lines()
97
98 def _parse_lines(self) -> None:
99 with open("commands.json", "w") as ofh:
100 with open(self.pathname) as f:
101 line: str
102 for line in f:
103 line = line.rstrip("\n")
104 command_dict = self._parse_line(line)
105 # we may want to re-write args here
106 new_args_list = []
107
108 args_list = command_dict.get("args", [])
109 for arg in args_list:
110 if arg.startswith("--target="):
111 pathname = arg[9:]
112 new_args_list.append(arg)
113 if "[" in pathname:
114 elems = pathname.split("[")
115 sequence_file = elems.pop(0)
116 self.package_file.add_file(sequence_file, sequence_file)
117 for elem in elems:
118 if elem.endswith("]"):
119 elem = elem[:-1]
120 if elem.startswith("subset="):
121 subset_file = elem[7:]
122 self.package_file.add_file(subset_file)
123
124 elif arg.startswith("--query="):
125 pathname = arg[8:]
126 new_args_list.append(arg)
127 if "[" in pathname:
128 elems = pathname.split("[")
129 sequence_file = elems.pop(0)
130 self.package_file.add_file(sequence_file, sequence_file)
131 for elem in elems:
132 if elem.endswith("]"):
133 elem = elem[:-1]
134 if elem.startswith("subset="):
135 subset_file = elem[7:]
136 self.package_file.add_file(subset_file)
137 elif arg.startswith("--segments="):
138 pathname = arg[11:]
139 new_args_list.append(arg)
140 self.package_file.add_file(pathname)
141 elif arg.startswith("--scores="):
142 pathname = arg[9:]
143 new_args_list.append("--scores=data/scores.txt")
144 self.package_file.add_file(pathname, "data/scores.txt")
145 else:
146 new_args_list.append(arg)
147
148 command_dict["args"] = new_args_list
149 print(json.dumps(command_dict), file=ofh)
150
151 self.package_file.add_config("commands.json")
152
153 def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]:
154 # resolve shell redirects
155 trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined]
156 positions: typing.List[typing.Tuple[int, int]] = []
157
158 for tree in trees:
159 visitor = nodevisitor(positions)
160 visitor.visit(tree)
161
162 # do replacements from the end so the indicies will be correct
163 positions.reverse()
164
165 processed = list(line)
166 for start, end in positions:
167 processed[start:end] = ""
168
169 processed_line: str = "".join(processed)
170
171 command_dict = self._parse_processed_line(processed_line)
172 command_dict["stdin"] = visitor.stdin
173 command_dict["stdout"] = visitor.stdout
174 command_dict["stderr"] = visitor.stderr
175
176 return command_dict
177
178 def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]:
179 argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined]
180 self.executable = argv.pop(0)
181
182 parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False)
183 if "arguments" in self.config:
184 arguments_section = self.config["arguments"]
185
186 arg: str
187 if "flag_args" in arguments_section:
188 for arg in arguments_section["flag_args"].split():
189 parser.add_argument(f"--{arg}", action="store_true")
190
191 if "str_args" in arguments_section:
192 for arg in arguments_section["str_args"].split():
193 parser.add_argument(f"--{arg}", type=str)
194
195 if "bool_str_args" in arguments_section:
196 for arg in arguments_section["bool_str_args"].split():
197 parser.add_argument(
198 f"--{arg}", nargs="?", const=True, default=False
199 )
200
201 if "int_args" in arguments_section:
202 for arg in arguments_section["int_args"].split():
203 parser.add_argument(f"--{arg}", type=int)
204
205 if "bool_int_args" in arguments_section:
206 for arg in arguments_section["bool_int_args"].split():
207 parser.add_argument(
208 f"--{arg}", nargs="?", const=True, default=False
209 )
210
211 namespace, rest = parser.parse_known_intermixed_args(argv)
212 vars_dict = vars(namespace)
213
214 command_dict: typing.Dict[str, typing.Any] = {
215 "executable": self.executable,
216 "args": [],
217 }
218
219 for var in vars_dict.keys():
220 value = vars_dict[var]
221 if value is not None:
222 if isinstance(value, bool):
223 if value is True:
224 command_dict["args"].append(f"--{var}")
225 else:
226 command_dict["args"].append(f"--{var}={value}")
227
228 if len(rest) >= 0:
229 value = rest.pop(0)
230 command_dict["args"].append(f"--target={value}")
231
232 if len(rest) >= 0:
233 value = rest.pop(0)
234 command_dict["args"].append(f"--query={value}")
235
236 return command_dict
237
238
239 class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc]
240 def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None:
241 self.positions = positions
242 self.stdin = None
243 self.stdout = None
244 self.stderr = None
245
246 def visitredirect(
247 self,
248 n: bashlex.ast.node, # type: ignore[name-defined]
249 n_input: int,
250 n_type: str,
251 output: typing.Any,
252 heredoc: typing.Any,
253 ) -> None:
254 if isinstance(n_input, int) and 0 <= n_input <= 2:
255 if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined]
256 self.positions.append(n.pos)
257 if n_input == 0:
258 self.stdin = output.word
259 elif n_input == 1:
260 self.stdout = output.word
261 elif n_input == 2:
262 self.stderr = output.word
263 else:
264 sys.exit(f"oops 1: {type(n_input)}")
265 else:
266 sys.exit(f"oops 2: {type(n_input)}")
267
268 def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined]
269 pass
270
271
272 def main() -> None:
273 our_dirname: str = os.path.dirname(os.path.realpath(__file__))
274 lastz_command_config_file: str = os.path.join(our_dirname, "lastz-cmd.ini")
275
276 config: configparser.ConfigParser = configparser.ConfigParser()
277 config.read(lastz_command_config_file)
278
279 package_file = PackageFile()
280 lastz_command_file = "lastz_commands.txt"
281 bashCommandLineFile(lastz_command_file, config, package_file)
282 package_file.close()
283
284
285 if __name__ == "__main__":
286 main()