0
|
1 #!/usr/bin/env python
|
|
2 # ref: https://galaxyproject.org/admin/tools/data-managers/how-to/define/
|
|
3
|
|
4 # Rewritten by H.E. Cicada Brokaw Dennis from a source downloaded from the toolshed and
|
|
5 # other example code on the web.
|
|
6 # This file downloads annotations for lncrna (slncky tool)
|
|
7
|
|
8 import argparse
|
|
9 import os
|
|
10 import subprocess
|
|
11
|
|
12 # The following is used to generate a unique_id value
|
|
13 from datetime import *
|
|
14
|
|
15 # Remove the following line when testing without galaxy package:
|
|
16 from galaxy.util.json import to_json_string
|
|
17 # Am not using the following:
|
|
18 # from galaxy.util.json import from_json_string
|
|
19
|
|
20 # The FileListParser is used by get_ctat_genome_filenames(),
|
|
21 # which is called by the Data Manager interface (.xml file) to get
|
|
22 # the filenames that are available online at broadinstitute.org
|
|
23 # Not sure best way to do it.
|
|
24 # This object uses HTMLParser to look through the html
|
|
25 # searching for the filenames within anchor tags.
|
|
26 import urllib2
|
|
27 from HTMLParser import HTMLParser
|
|
28
|
|
29 #_CTAT_lncrnaIndexPage_URL = 'https://data.broadinstitute.org/Trinity/CTAT/lncrna/annotations.tar.gz'
|
|
30 _CTAT_lncrnaDownload_URL = 'https://data.broadinstitute.org/Trinity/CTAT/lncrna/annotations.tar.gz'
|
|
31 _CTAT_lncrnaTableName = 'ctat_lncrna_annotations'
|
|
32 _CTAT_lncrnaDir_Name = 'annotations'
|
|
33 _CTAT_lncrna_DisplayNamePrefix = 'CTAT_lncrna_annotations_'
|
|
34 _lncrnaFileExtension = 'lc'
|
|
35 _NumBytesNeededForAnnotations = 2147483648 # Number of bytes
|
|
36 #_DownloadFileSize = 5790678746 # 5.4 Gigabytes.
|
|
37 _Download_TestFile = 'write_testfile.txt'
|
|
38 _DownloadSuccessFile = 'download_succeeded.txt'
|
|
39
|
|
40 '''
|
|
41 class FileListParser(HTMLParser):
|
|
42 def __init__(self):
|
|
43 # Have to use direct call to super class rather than using super():
|
|
44 # super(FileListParser, self).__init__()
|
|
45 # because HTMLParser is an "old style" class and its inheritance chain does not include object.
|
|
46 HTMLParser.__init__(self)
|
|
47 self.filenames = set()
|
|
48 def handle_starttag(self, tag, attrs):
|
|
49 # Look for filename references in anchor tags and add them to filenames.
|
|
50 if tag == "a":
|
|
51 # The tag is an anchor tag.
|
|
52 for attribute in attrs:
|
|
53 # print "Checking: {:s}".format(str(attribute))
|
|
54 if attribute[0] == "href":
|
|
55 # Does the href have a tar.gz in it?
|
|
56 if ("tar.gz" in attribute[1]) and ("md5" not in attribute[1]):
|
|
57 # Add the value to filenames.
|
|
58 self.filenames.add(attribute[1])
|
|
59 # End of class FileListParser
|
|
60 '''
|
|
61
|
|
62
|
|
63 def get_ctat_lncrna_annotations_locations():
|
|
64 # For dynamic options need to return an interable with contents that are tuples with 3 items.
|
|
65 # Item one is a string that is the display name put into the option list.
|
|
66 # Item two is the value that is put into the parameter associated with the option list.
|
|
67 # Item three is a True or False value, indicating whether the item is selected.
|
|
68 options = []
|
|
69 # open the url and retrieve the filenames of the files in the directory.
|
|
70 # resource = urllib2.urlopen(_CTAT_lncrnaIndexPage_URL)
|
|
71 # theHTML = resource.read()
|
|
72 # filelist_parser = FileListParser()
|
|
73 # filelist_parser.feed(theHTML)
|
|
74 options.append((_CTAT_lncrnaDir_Name, _CTAT_lncrnaDownload_URL, True))
|
|
75 print "The list of items being returned for the option menu is:"
|
|
76 print str(options)
|
|
77 return options
|
|
78
|
|
79 def download_annotations(src_location, destination, force_download):
|
|
80 # We do not know if the annotations has been downloaded already.
|
|
81 # This function returns whether or not the annotations actually gets downloaded.
|
|
82 annotations_was_downloaded = False
|
|
83 # Get the root filename of the Genome Directory.
|
|
84 # The part after the last '/' and before the first '.'
|
|
85 root_annotations_dirname = src_location.split("/")[-1].split(".")[0]
|
|
86
|
|
87 # We want to make sure that destination is absolute fully specified path.
|
|
88 cannonical_destination = os.path.realpath(destination)
|
|
89 if cannonical_destination.split("/")[-1] != root_annotations_dirname:
|
|
90 cannonical_destination += "/" + root_annotations_dirname
|
|
91 if os.path.exists(cannonical_destination):
|
|
92 if not os.path.isdir(cannonical_destination):
|
|
93 raise ValueError("The destination is not a directory: " + \
|
|
94 "{:s}".format(cannonical_destination))
|
|
95 # else all is good. It is a directory.
|
|
96 else:
|
|
97 # We need to create it.
|
|
98 try:
|
|
99 os.makedirs(cannonical_destination)
|
|
100 except os.error:
|
|
101 print "ERROR: Trying to create the following directory path:"
|
|
102 print "\t{:s}".format(cannonical_destination)
|
|
103 raise
|
|
104
|
|
105 # Make sure the directory now exists and we can write to it.
|
|
106 if not os.path.exists(cannonical_destination):
|
|
107 # It should have been created, but if it doesn't exist at this point
|
|
108 # in the code, something is wrong. Raise an error.
|
|
109 raise OSError("The destination directory could not be created: " + \
|
|
110 "{:s}".format(cannonical_destination))
|
|
111 test_writing_file = "{:s}/{:s}".format(cannonical_destination, _Download_TestFile)
|
|
112 try:
|
|
113 filehandle = open(test_writing_file, "w")
|
|
114 filehandle.write("Testing writing to this file.")
|
|
115 filehandle.close()
|
|
116 os.remove(test_writing_file)
|
|
117 except IOError:
|
|
118 print "The destination directory could not be written into: " + \
|
|
119 "{:s}".format(cannonical_destination)
|
|
120 raise
|
|
121
|
|
122 # Get the list of files in the directory,
|
|
123 # We use it to check for a previous download or extraction among other things.
|
|
124 orig_files_in_destdir = set(os.listdir(cannonical_destination))
|
|
125 # See whether the file has been downloaded already.
|
|
126 download_success_file_path = "{:s}/{:s}".format(cannonical_destination, _DownloadSuccessFile)
|
|
127 if (_DownloadSuccessFile not in orig_files_in_destdir) or force_download:
|
|
128 # Check whether there is enough space on the device for the annotations.
|
|
129 statvfs = os.statvfs(cannonical_destination)
|
|
130 num_avail_bytes = statvfs.f_frsize * statvfs.f_bavail # Number of free bytes that ordinary users
|
|
131 # are allowed to use (excl. reserved space)
|
|
132 if (num_avail_bytes < _NumBytesNeededForAnnotations):
|
|
133 raise OSError("There is insufficient space ({:s} bytes)".format(str(num_avail_bytes)) + \
|
|
134 " on the device of the destination directory: " + \
|
|
135 "{:s}".format(cannonical_destination))
|
|
136
|
|
137
|
|
138 if (_DownloadSuccessFile in orig_files_in_destdir):
|
|
139 # Since we are redoing the download,
|
|
140 # the success file needs to be removed
|
|
141 # until the download has succeeded.
|
|
142 os.remove(download_success_file_path)
|
|
143 # We want to transfer and untar the file without storing the tar file, because that
|
|
144 # adds all that much more space to the needed amount of free space on the disk.
|
|
145 # Use subprocess to pipe the output of curl into tar.
|
|
146 # Make curl silent so progress is not printed to stderr.
|
|
147 command = "curl --silent {:s} | tar -xzf - -C {:s} --strip 1".format(src_location, cannonical_destination)
|
|
148 try: # to send the command that downloads and extracts the file.
|
|
149 command_output = subprocess.check_output(command, shell=True)
|
|
150 # FIX - not sure check_output is what we want to use. If we want to have an error raised on
|
|
151 # any problem, maybe we should not be checking output.
|
|
152 except subprocess.CalledProcessError:
|
|
153 print "ERROR: Trying to run the following command:\n\t{:s}".format(command)
|
|
154 raise
|
|
155 else:
|
|
156 annotations_was_downloaded = True
|
|
157
|
|
158 # Some code to help us if errors occur.
|
|
159 print "\n*******************************\nFinished download and extraction."
|
|
160 if os.path.exists(cannonical_destination) and os.path.isdir(cannonical_destination):
|
|
161 subprocess.check_call("ls -la {:s} 2>&1".format(cannonical_destination), shell=True)
|
|
162
|
|
163 files_in_destdir = set(os.listdir(cannonical_destination))
|
|
164 found_filenames = set()
|
|
165 for filename in files_in_destdir:
|
|
166 # There should be three files, but some OS's might have created
|
|
167 # other files in the directory, or maybe the user did.
|
|
168 # Look for the annotations files.
|
|
169 # The download files' names should start with the root_annotations_dirname
|
|
170 # print "Is root: {:s} in file: {:s}".format(root_annotations_dirname, filename)
|
|
171 if root_annotations_dirname in filename:
|
|
172 found_filenames.add(filename)
|
|
173 # print "The found_filenames are:\n\t{:s}".format(str(found_filenames))
|
|
174 ## Changed from found_filenames
|
|
175 if (len(files_in_destdir) >= 4):
|
|
176 # FIX - we could md5 the files to make sure they are correct.
|
|
177 # Or at least check their sizes, to see if the download completed ok.
|
|
178 # Also we could check the names of the files.
|
|
179 try:
|
|
180 # Create a file to indicate that the download succeeded.
|
|
181 subprocess.check_call("touch {:s}".format(download_success_file_path), shell=True)
|
|
182 except IOError:
|
|
183 print "The download_success file could not be created: " + \
|
|
184 "{:s}".format(download_success_file_path)
|
|
185 raise
|
|
186 else:
|
|
187 print "After download, the potential annotations files found are:\n\t{:s}".format(str(found_filenames))
|
|
188 raise ValueError("ERROR: Could not find the extracted annotations files " + \
|
|
189 "in the destination directory:\n\t{:s}".format(cannonical_destination))
|
|
190
|
|
191 return (cannonical_destination, root_annotations_dirname, annotations_was_downloaded)
|
|
192
|
|
193 def main():
|
|
194 #Parse Command Line
|
|
195 # print "At start before parsing arguments."
|
|
196 parser = argparse.ArgumentParser()
|
|
197 parser.add_argument('-d', '--download_location', default="", \
|
|
198 help='This is the download location of the lncrna annotations.')
|
|
199 parser.add_argument('-n', '--display_name', default="", \
|
|
200 help='Is used as the selector text for the entry of this lncrna annotations in the data table.')
|
|
201 parser.add_argument('-p', '--destination_path', \
|
|
202 help='Full path of the lncrna annotations location or destination, either where it is, or where it will be placed.')
|
|
203 parser.add_argument('-o', '--output_filename', \
|
|
204 help='Name of the output file, where the json dictionary will be written.')
|
|
205 parser.add_argument('-f', '--force_download',
|
|
206 help='Forces download of lncrna annotations, even if previously downloaded. ' + \
|
|
207 'Requires download_location to be set in order to work.', action="store_true")
|
|
208 args = parser.parse_args()
|
|
209
|
|
210 # print "Arguments are parsed."
|
|
211 print "\ndownload_location is {:s}".format(str(_CTAT_lncrnaDownload_URL))
|
|
212 print "display_name is {:s}".format(str(args.display_name))
|
|
213 print "destination_path is {:s}\n".format(str(args.destination_path))
|
|
214 root_annotations_dirname = None
|
|
215 # FIX - Prob don't need annotations_was_downloaded. Not doing anything with it.
|
|
216 # But it indicates success downloading the annotations, so maybe should be checking it.
|
|
217 annotations_was_downloaded = False
|
|
218 if (_CTAT_lncrnaDownload_URL != ""):
|
|
219 annotations_directory, root_annotations_dirname, annotations_was_downloaded = \
|
|
220 download_annotations(src_location=_CTAT_lncrnaDownload_URL, \
|
|
221 destination=args.destination_path, \
|
|
222 force_download=args.force_download)
|
|
223 else:
|
|
224 cannonical_destination = os.path.realpath(args.destination_path)
|
|
225 # If args.destination_path is a directory containing
|
|
226 # a subdirectory that contains the annotations files,
|
|
227 # then we need to set the annotations_directory to be that subdirectory.
|
|
228 if not os.path.exists(cannonical_destination):
|
|
229 raise ValueError("Cannot find the Lncrna annotations.\n" + \
|
|
230 "The directory does not exist:\n\t{:s}".format(cannonical_destination))
|
|
231 files_in_destination_path = os.listdir(cannonical_destination)
|
|
232 if (len(files_in_destination_path) == 4):
|
|
233 #path_to_file = "{:s}/{:s}".format(cannonical_destination, files_in_destination_path[0])
|
|
234 #if os.path.isdir(path_to_file):
|
|
235 # annotations_directory = path_to_file
|
|
236 #else:
|
|
237 annotations_directory = cannonical_destination
|
|
238 else:
|
|
239 raise ValueError("Contents of destination directory not equal to expected - 4")
|
|
240 #annotations_directory = cannonical_destination
|
|
241 # Get the root_annotations_dirname of the annotations from the annotations_directory name.
|
|
242 root_annotations_dirname = annotations_directory.split("/")[-1].split(".")[0]
|
|
243
|
|
244 # Check if there is an actual Lncrna annotations file in the annotations_directory.
|
|
245 print "\nThe location of the Lncrna annotations is {:s}.\n".format(annotations_directory)
|
|
246 files_in_annotations_directory = set(os.listdir(annotations_directory))
|
|
247 annotations_file_found = False
|
|
248 annotations_file_path_mm9 = annotations_directory+"/annotations.config"
|
|
249 annotations_file_path_mm10 = annotations_directory+"/annotations.config"
|
|
250 annotations_file_path_hg19 = annotations_directory+"/annotations.config"
|
|
251 annotations_file_path_hg38 = annotations_directory+"/annotations.config"
|
|
252
|
|
253 # Set the display_name
|
|
254 # if (args.display_name is None) or (args.display_name == ""):
|
|
255 # Use the root_annotations_dirname.
|
|
256 # print "display_name_ok$$$$$$$"
|
|
257
|
|
258 if (root_annotations_dirname != None) and (root_annotations_dirname != ""):
|
|
259 print "root_annotations_ok%%%%"
|
|
260 display_name_hg19 = "hg19"
|
|
261 display_name_hg38 = "hg38"
|
|
262 display_name_mm10 = "mm10"
|
|
263 display_name_mm9 = "mm9"
|
|
264 else:
|
|
265 display_name = _CTAT_lncrna_DisplayNamePrefix + _CTAT_lncrnaDir_Name
|
|
266 print "WARNING: Did not set the display name. Using the default: {:s}".format(display_name_value)
|
|
267 #else:
|
|
268 # display_name = _CTAT_lncrna_DisplayNamePrefix + args.display_name
|
|
269 # display_name = display_name.replace(" ","_")
|
|
270
|
|
271 # Set the unique_id
|
|
272 datetime_stamp = datetime.now().strftime("_%Y_%m_%d_%H_%M_%S_%f")
|
|
273 if (root_annotations_dirname != None) and (root_annotations_dirname != ""):
|
|
274 hg19_unique_id = "ctat_lncrna_hg19" + datetime_stamp
|
|
275 mm10_unique_id = "ctat_lncrna_mm10" + datetime_stamp
|
|
276 mm9_unique_id = "ctat_lncrna_mm9" + datetime_stamp
|
|
277 hg38_unique_id = "ctat_lncrna_hg38" + datetime_stamp
|
|
278 else:
|
|
279 unique_id = _CTAT_lncrnaDir_Name + datetime_stamp
|
|
280
|
|
281 print "The hg19 Index's display_name will be set to: {:s}\n".format(display_name_hg19)
|
|
282 print "Its hg19 unique_id will be set to: {:s}\n".format(hg19_unique_id)
|
|
283 print "Its hg19 dir_path will be set to: {:s}\n".format(annotations_file_path_hg19)
|
|
284
|
|
285
|
|
286 print "The hg38 Index's display_name will be set to: {:s}\n".format(display_name_hg38)
|
|
287 print "Its hg38 unique_id will be set to: {:s}\n".format(hg38_unique_id)
|
|
288 print "Its hg38 dir_path will be set to: {:s}\n".format(annotations_file_path_hg38)
|
|
289
|
|
290
|
|
291 print "The mm9 Index's display_name will be set to: {:s}\n".format(display_name_mm9)
|
|
292 print "Its mm9 unique_id will be set to: {:s}\n".format(mm9_unique_id)
|
|
293 print "Its mm9 dir_path will be set to: {:s}\n".format(annotations_file_path_mm9)
|
|
294
|
|
295
|
|
296 print "The mm10 Index's display_name will be set to: {:s}\n".format(display_name_mm10)
|
|
297 print "Its mm10 unique_id will be set to: {:s}\n".format(mm10_unique_id)
|
|
298 print "Its mm10 dir_path will be set to: {:s}\n".format(annotations_file_path_mm10)
|
|
299
|
|
300 data_manager_dict = {}
|
|
301 data_manager_dict['data_tables'] = {}
|
|
302 data_manager_dict['data_tables'][_CTAT_lncrnaTableName] = []
|
|
303 data_table_entry_mm9 = dict(value=display_name_mm9, name=display_name_mm9, path=annotations_file_path_mm9)
|
|
304 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_mm9)
|
|
305
|
|
306 data_table_entry_mm10 = dict(value=display_name_mm10, name=display_name_mm10, path=annotations_file_path_mm10)
|
|
307 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_mm10)
|
|
308
|
|
309 data_table_entry_hg19 = dict(value=display_name_hg19, name=display_name_hg19, path=annotations_file_path_hg19)
|
|
310 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_hg19)
|
|
311
|
|
312 data_table_entry_hg38 = dict(value=display_name_hg38, name=display_name_hg38, path=annotations_file_path_hg38)
|
|
313 data_manager_dict['data_tables'][_CTAT_lncrnaTableName].append(data_table_entry_hg38)
|
|
314
|
|
315 # Temporarily the output file's dictionary is written for debugging:
|
|
316 print "The dictionary for the output file is:\n\t{:s}".format(str(data_manager_dict))
|
|
317 # Save info to json file. This is used to transfer data from the DataManager tool, to the data manager,
|
|
318 # which then puts it into the correct .loc file (I think).
|
|
319 # Remove the following line when testing without galaxy package.
|
|
320 open(args.output_filename, 'wb').write(to_json_string(data_manager_dict))
|
|
321
|
|
322 if __name__ == "__main__":
|
|
323 main()
|