Mercurial > repos > trinity_ctat > ctat_lncrna_annotations_data_manager
comparison data_manager/add_ctat_lncrna_annotations.py @ 0:a3aa3f9e1702 draft default tip
Uploaded
author | trinity_ctat |
---|---|
date | Mon, 16 Jul 2018 20:42:55 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a3aa3f9e1702 |
---|---|
1 #!/usr/bin/env python | |
2 # ref: https://galaxyproject.org/admin/tools/data-managers/how-to/define/ | |
3 | |
4 # Rewritten by H.E. Cicada Brokaw Dennis from a source downloaded from the toolshed and | |
5 # other example code on the web. | |
6 # This file downloads annotations for lncrna (slncky tool) | |
7 | |
8 import argparse | |
9 import os | |
10 import subprocess | |
11 | |
12 # The following is used to generate a unique_id value | |
13 from datetime import * | |
14 | |
15 # Remove the following line when testing without galaxy package: | |
16 from galaxy.util.json import to_json_string | |
17 # Am not using the following: | |
18 # from galaxy.util.json import from_json_string | |
19 | |
20 # The FileListParser is used by get_ctat_genome_filenames(), | |
21 # which is called by the Data Manager interface (.xml file) to get | |
22 # the filenames that are available online at broadinstitute.org | |
23 # Not sure best way to do it. | |
24 # This object uses HTMLParser to look through the html | |
25 # searching for the filenames within anchor tags. | |
26 import urllib2 | |
27 from HTMLParser import HTMLParser | |
28 | |
29 #_CTAT_lncrnaIndexPage_URL = 'https://data.broadinstitute.org/Trinity/CTAT/lncrna/annotations.tar.gz' | |
30 _CTAT_lncrnaDownload_URL = 'https://data.broadinstitute.org/Trinity/CTAT/lncrna/annotations.tar.gz' | |
31 _CTAT_lncrnaTableName = 'ctat_lncrna_annotations' | |
32 _CTAT_lncrnaDir_Name = 'annotations' | |
33 _CTAT_lncrna_DisplayNamePrefix = 'CTAT_lncrna_annotations_' | |
34 _lncrnaFileExtension = 'lc' | |
35 _NumBytesNeededForAnnotations = 2147483648 # Number of bytes | |
36 #_DownloadFileSize = 5790678746 # 5.4 Gigabytes. | |
37 _Download_TestFile = 'write_testfile.txt' | |
38 _DownloadSuccessFile = 'download_succeeded.txt' | |
39 | |
40 ''' | |
41 class FileListParser(HTMLParser): | |
42 def __init__(self): | |
43 # Have to use direct call to super class rather than using super(): | |
44 # super(FileListParser, self).__init__() | |
45 # because HTMLParser is an "old style" class and its inheritance chain does not include object. | |
46 HTMLParser.__init__(self) | |
47 self.filenames = set() | |
48 def handle_starttag(self, tag, attrs): | |
49 # Look for filename references in anchor tags and add them to filenames. | |
50 if tag == "a": | |
51 # The tag is an anchor tag. | |
52 for attribute in attrs: | |
53 # print "Checking: {:s}".format(str(attribute)) | |
54 if attribute[0] == "href": | |
55 # Does the href have a tar.gz in it? | |
56 if ("tar.gz" in attribute[1]) and ("md5" not in attribute[1]): | |
57 # Add the value to filenames. | |
58 self.filenames.add(attribute[1]) | |
59 # End of class FileListParser | |
60 ''' | |
61 | |
62 | |
63 def get_ctat_lncrna_annotations_locations(): | |
64 # For dynamic options need to return an interable with contents that are tuples with 3 items. | |
65 # Item one is a string that is the display name put into the option list. | |
66 # Item two is the value that is put into the parameter associated with the option list. | |
67 # Item three is a True or False value, indicating whether the item is selected. | |
68 options = [] | |
69 # open the url and retrieve the filenames of the files in the directory. | |
70 # resource = urllib2.urlopen(_CTAT_lncrnaIndexPage_URL) | |
71 # theHTML = resource.read() | |
72 # filelist_parser = FileListParser() | |
73 # filelist_parser.feed(theHTML) | |
74 options.append((_CTAT_lncrnaDir_Name, _CTAT_lncrnaDownload_URL, True)) | |
75 print "The list of items being returned for the option menu is:" | |
76 print str(options) | |
77 return options | |
78 | |
79 def download_annotations(src_location, destination, force_download): | |
80 # We do not know if the annotations has been downloaded already. | |
81 # This function returns whether or not the annotations actually gets downloaded. | |
82 annotations_was_downloaded = False | |
83 # Get the root filename of the Genome Directory. | |
84 # The part after the last '/' and before the first '.' | |
85 root_annotations_dirname = src_location.split("/")[-1].split(".")[0] | |
86 | |
87 # We want to make sure that destination is absolute fully specified path. | |
88 cannonical_destination = os.path.realpath(destination) | |
89 if cannonical_destination.split("/")[-1] != root_annotations_dirname: | |
90 cannonical_destination += "/" + root_annotations_dirname | |
91 if os.path.exists(cannonical_destination): | |
92 if not os.path.isdir(cannonical_destination): | |
93 raise ValueError("The destination is not a directory: " + \ | |
94 "{:s}".format(cannonical_destination)) | |
95 # else all is good. It is a directory. | |
96 else: | |
97 # We need to create it. | |
98 try: | |
99 os.makedirs(cannonical_destination) | |
100 except os.error: | |
101 print "ERROR: Trying to create the following directory path:" | |
102 print "\t{:s}".format(cannonical_destination) | |
103 raise | |
104 | |
105 # Make sure the directory now exists and we can write to it. | |
106 if not os.path.exists(cannonical_destination): | |
107 # It should have been created, but if it doesn't exist at this point | |
108 # in the code, something is wrong. Raise an error. | |
109 raise OSError("The destination directory could not be created: " + \ | |
110 "{:s}".format(cannonical_destination)) | |
111 test_writing_file = "{:s}/{:s}".format(cannonical_destination, _Download_TestFile) | |
112 try: | |
113 filehandle = open(test_writing_file, "w") | |
114 filehandle.write("Testing writing to this file.") | |
115 filehandle.close() | |
116 os.remove(test_writing_file) | |
117 except IOError: | |
118 print "The destination directory could not be written into: " + \ | |
119 "{:s}".format(cannonical_destination) | |
120 raise | |
121 | |
122 # Get the list of files in the directory, | |
123 # We use it to check for a previous download or extraction among other things. | |
124 orig_files_in_destdir = set(os.listdir(cannonical_destination)) | |
125 # See whether the file has been downloaded already. | |
126 download_success_file_path = "{:s}/{:s}".format(cannonical_destination, _DownloadSuccessFile) | |
127 if (_DownloadSuccessFile not in orig_files_in_destdir) or force_download: | |
128 # Check whether there is enough space on the device for the annotations. | |
129 statvfs = os.statvfs(cannonical_destination) | |
130 num_avail_bytes = statvfs.f_frsize * statvfs.f_bavail # Number of free bytes that ordinary users | |
131 # are allowed to use (excl. reserved space) | |
132 if (num_avail_bytes < _NumBytesNeededForAnnotations): | |
133 raise OSError("There is insufficient space ({:s} bytes)".format(str(num_avail_bytes)) + \ | |
134 " on the device of the destination directory: " + \ | |
135 "{:s}".format(cannonical_destination)) | |
136 | |
137 | |
138 if (_DownloadSuccessFile in orig_files_in_destdir): | |
139 # Since we are redoing the download, | |
140 # the success file needs to be removed | |
141 # until the download has succeeded. | |
142 os.remove(download_success_file_path) | |
143 # We want to transfer and untar the file without storing the tar file, because that | |
144 # adds all that much more space to the needed amount of free space on the disk. | |
145 # Use subprocess to pipe the output of curl into tar. | |
146 # Make curl silent so progress is not printed to stderr. | |
147 command = "curl --silent {:s} | tar -xzf - -C {:s} --strip 1".format(src_location, cannonical_destination) | |
148 try: # to send the command that downloads and extracts the file. | |
149 command_output = subprocess.check_output(command, shell=True) | |
150 # FIX - not sure check_output is what we want to use. If we want to have an error raised on | |
151 # any problem, maybe we should not be checking output. | |
152 except subprocess.CalledProcessError: | |
153 print "ERROR: Trying to run the following command:\n\t{:s}".format(command) | |
154 raise | |
155 else: | |
156 annotations_was_downloaded = True | |
157 | |
158 # Some code to help us if errors occur. | |
159 print "\n*******************************\nFinished download and extraction." | |
160 if os.path.exists(cannonical_destination) and os.path.isdir(cannonical_destination): | |
161 subprocess.check_call("ls -la {:s} 2>&1".format(cannonical_destination), shell=True) | |
162 | |
163 files_in_destdir = set(os.listdir(cannonical_destination)) | |
164 found_filenames = set() | |
165 for filename in files_in_destdir: | |
166 # There should be three files, but some OS's might have created | |
167 # other files in the directory, or maybe the user did. | |
168 # Look for the annotations files. | |
169 # The download files' names should start with the root_annotations_dirname | |
170 # print "Is root: {:s} in file: {:s}".format(root_annotations_dirname, filename) | |
171 if root_annotations_dirname in filename: | |
172 found_filenames.add(filename) | |
173 # print "The found_filenames are:\n\t{:s}".format(str(found_filenames)) | |
174 ## Changed from found_filenames | |
175 if (len(files_in_destdir) >= 4): | |
176 # FIX - we could md5 the files to make sure they are correct. | |
177 # Or at least check their sizes, to see if the download completed ok. | |
178 # Also we could check the names of the files. | |
179 try: | |
180 # Create a file to indicate that the download succeeded. | |
181 subprocess.check_call("touch {:s}".format(download_success_file_path), shell=True) | |
182 except IOError: | |
183 print "The download_success file could not be created: " + \ | |
184 "{:s}".format(download_success_file_path) | |
185 raise | |
186 else: | |
187 print "After download, the potential annotations files found are:\n\t{:s}".format(str(found_filenames)) | |
188 raise ValueError("ERROR: Could not find the extracted annotations files " + \ | |
189 "in the destination directory:\n\t{:s}".format(cannonical_destination)) | |
190 | |
191 return (cannonical_destination, root_annotations_dirname, annotations_was_downloaded) | |
192 | |
193 def main(): | |
194 #Parse Command Line | |
195 # print "At start before parsing arguments." | |
196 parser = argparse.ArgumentParser() | |
197 parser.add_argument('-d', '--download_location', default="", \ | |
198 help='This is the download location of the lncrna annotations.') | |
199 parser.add_argument('-n', '--display_name', default="", \ | |
200 help='Is used as the selector text for the entry of this lncrna annotations in the data table.') | |
201 parser.add_argument('-p', '--destination_path', \ | |
202 help='Full path of the lncrna annotations location or destination, either where it is, or where it will be placed.') | |
203 parser.add_argument('-o', '--output_filename', \ | |
204 help='Name of the output file, where the json dictionary will be written.') | |
205 parser.add_argument('-f', '--force_download', | |
206 help='Forces download of lncrna annotations, even if previously downloaded. ' + \ | |
207 'Requires download_location to be set in order to work.', action="store_true") | |
208 args = parser.parse_args() | |
209 | |
210 # print "Arguments are parsed." | |
211 print "\ndownload_location is {:s}".format(str(_CTAT_lncrnaDownload_URL)) | |
212 print "display_name is {:s}".format(str(args.display_name)) | |
213 print "destination_path is {:s}\n".format(str(args.destination_path)) | |
214 root_annotations_dirname = None | |
215 # FIX - Prob don't need annotations_was_downloaded. Not doing anything with it. | |
216 # But it indicates success downloading the annotations, so maybe should be checking it. | |
217 annotations_was_downloaded = False | |
218 if (_CTAT_lncrnaDownload_URL != ""): | |
219 annotations_directory, root_annotations_dirname, annotations_was_downloaded = \ | |
220 download_annotations(src_location=_CTAT_lncrnaDownload_URL, \ | |
221 destination=args.destination_path, \ | |
222 force_download=args.force_download) | |
223 else: | |
224 cannonical_destination = os.path.realpath(args.destination_path) | |
225 # If args.destination_path is a directory containing | |
226 # a subdirectory that contains the annotations files, | |
227 # then we need to set the annotations_directory to be that subdirectory. | |
228 if not os.path.exists(cannonical_destination): | |
229 raise ValueError("Cannot find the Lncrna annotations.\n" + \ | |
230 "The directory does not exist:\n\t{:s}".format(cannonical_destination)) | |
231 files_in_destination_path = os.listdir(cannonical_destination) | |
232 if (len(files_in_destination_path) == 4): | |
233 #path_to_file = "{:s}/{:s}".format(cannonical_destination, files_in_destination_path[0]) | |
234 #if os.path.isdir(path_to_file): | |
235 # annotations_directory = path_to_file | |
236 #else: | |
237 annotations_directory = cannonical_destination | |
238 else: | |
239 raise ValueError("Contents of destination directory not equal to expected - 4") | |
240 #annotations_directory = cannonical_destination | |
241 # Get the root_annotations_dirname of the annotations from the annotations_directory name. | |
242 root_annotations_dirname = annotations_directory.split("/")[-1].split(".")[0] | |
243 | |
244 # Check if there is an actual Lncrna annotations file in the annotations_directory. | |
245 print "\nThe location of the Lncrna annotations is {:s}.\n".format(annotations_directory) | |
246 files_in_annotations_directory = set(os.listdir(annotations_directory)) | |
247 annotations_file_found = False | |
248 annotations_file_path_mm9 = annotations_directory+"/annotations.config" | |
249 annotations_file_path_mm10 = annotations_directory+"/annotations.config" | |
250 annotations_file_path_hg19 = annotations_directory+"/annotations.config" | |
251 annotations_file_path_hg38 = annotations_directory+"/annotations.config" | |
252 | |
253 # Set the display_name | |
254 # if (args.display_name is None) or (args.display_name == ""): | |
255 # Use the root_annotations_dirname. | |
256 # print "display_name_ok$$$$$$$" | |
257 | |
258 if (root_annotations_dirname != None) and (root_annotations_dirname != ""): | |
259 print "root_annotations_ok%%%%" | |
260 display_name_hg19 = "hg19" | |
261 display_name_hg38 = "hg38" | |
262 display_name_mm10 = "mm10" | |
263 display_name_mm9 = "mm9" | |
264 else: | |
265 display_name = _CTAT_lncrna_DisplayNamePrefix + _CTAT_lncrnaDir_Name | |
266 print "WARNING: Did not set the display name. Using the default: {:s}".format(display_name_value) | |
267 #else: | |
268 # display_name = _CTAT_lncrna_DisplayNamePrefix + args.display_name | |
269 # display_name = display_name.replace(" ","_") | |
270 | |
271 # Set the unique_id | |
272 datetime_stamp = datetime.now().strftime("_%Y_%m_%d_%H_%M_%S_%f") | |
273 if (root_annotations_dirname != None) and (root_annotations_dirname != ""): | |
274 hg19_unique_id = "ctat_lncrna_hg19" + datetime_stamp | |
275 mm10_unique_id = "ctat_lncrna_mm10" + datetime_stamp | |
276 mm9_unique_id = "ctat_lncrna_mm9" + datetime_stamp | |
277 hg38_unique_id = "ctat_lncrna_hg38" + datetime_stamp | |
278 else: | |
279 unique_id = _CTAT_lncrnaDir_Name + datetime_stamp | |
280 | |
281 print "The hg19 Index's display_name will be set to: {:s}\n".format(display_name_hg19) | |
282 print "Its hg19 unique_id will be set to: {:s}\n".format(hg19_unique_id) | |
283 print "Its hg19 dir_path will be set to: {:s}\n".format(annotations_file_path_hg19) | |
284 | |
285 | |
286 print "The hg38 Index's display_name will be set to: {:s}\n".format(display_name_hg38) | |
287 print "Its hg38 unique_id will be set to: {:s}\n".format(hg38_unique_id) | |
288 print "Its hg38 dir_path will be set to: {:s}\n".format(annotations_file_path_hg38) | |
289 | |
290 | |
291 print "The mm9 Index's display_name will be set to: {:s}\n".format(display_name_mm9) | |
292 print "Its mm9 unique_id will be set to: {:s}\n".format(mm9_unique_id) | |
293 print "Its mm9 dir_path will be set to: {:s}\n".format(annotations_file_path_mm9) | |
294 | |
295 | |
296 print "The mm10 Index's display_name will be set to: {:s}\n".format(display_name_mm10) | |
297 print "Its mm10 unique_id will be set to: {:s}\n".format(mm10_unique_id) | |
298 print "Its mm10 dir_path will be set to: {:s}\n".format(annotations_file_path_mm10) | |
299 | |
300 data_manager_dict = {} | |
301 data_manager_dict['data_tables'] = {} | |
302 data_manager_dict['data_tables'][_CTAT_lncrnaTableName] = [] | |
303 data_table_entry_mm9 = dict(value=display_name_mm9, name=display_name_mm9, path=annotations_file_path_mm9) | |
304 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_mm9) | |
305 | |
306 data_table_entry_mm10 = dict(value=display_name_mm10, name=display_name_mm10, path=annotations_file_path_mm10) | |
307 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_mm10) | |
308 | |
309 data_table_entry_hg19 = dict(value=display_name_hg19, name=display_name_hg19, path=annotations_file_path_hg19) | |
310 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_hg19) | |
311 | |
312 data_table_entry_hg38 = dict(value=display_name_hg38, name=display_name_hg38, path=annotations_file_path_hg38) | |
313 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_hg38) | |
314 | |
315 # Temporarily the output file's dictionary is written for debugging: | |
316 print "The dictionary for the output file is:\n\t{:s}".format(str(data_manager_dict)) | |
317 # Save info to json file. This is used to transfer data from the DataManager tool, to the data manager, | |
318 # which then puts it into the correct .loc file (I think). | |
319 # Remove the following line when testing without galaxy package. | |
320 open(args.output_filename, 'wb').write(to_json_string(data_manager_dict)) | |
321 | |
322 if __name__ == "__main__": | |
323 main() |