comparison SMART/Java/Python/ncList/FileSorter.py @ 36:44d5973c188c

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 15:02:29 -0400
parents 769e306b7933
children 85e80c21b1f7
comparison
equal deleted inserted replaced
35:d94018ca4ada 36:44d5973c188c
1 #! /usr/bin/env python
2 #
3 # Copyright INRA-URGI 2009-2010
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30 #
31
32 try:
33 import cPickle as pickle
34 except:
35 import pickle
36 import random, os
37 from heapq import heapify, heappop, heappush
38 from itertools import islice, cycle
39 from SMART.Java.Python.structure.Transcript import Transcript
40 from SMART.Java.Python.misc.Progress import Progress
41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress
42
43 BUFFER_SIZE = 100 * 1024
44
45 class FileSorter(object):
46
47 def __init__(self, parser, verbosity = 1):
48 self._parser = parser
49 self._verbosity = verbosity
50 self._chunks = {}
51 self._nbElements = 0
52 self._nbElementsPerChromosome = {}
53 self._perChromosome = False
54 self._isPreSorted = False
55 self._outputFileNames = {}
56 self._prefix = "tmpFile_%d" % (random.randint(0, 100000))
57 self._chromosome = None
58 if "SMARTTMPPATH" in os.environ:
59 self._prefix = os.path.join(os.environ["SMARTTMPPATH"], self._prefix)
60
61 def selectChromosome(self, chromosome):
62 self._chromosome = chromosome
63
64 def perChromosome(self, boolean):
65 self._perChromosome = boolean
66
67 def setOutputFileName(self, fileName):
68 self._outputFileName = fileName
69 if self._perChromosome:
70 self._outputFileName = os.path.splitext(self._outputFileName)[0]
71
72 def setPresorted(self, presorted):
73 self._isPreSorted = presorted
74
75 def sort(self):
76 if not self._isPreSorted:
77 self._batchSort()
78 else:
79 self._presorted()
80
81 def _presorted(self):
82 progress = UnlimitedProgress(1000, "Writing files %s" % (self._parser.fileName), self._verbosity)
83 curChromosome = None
84 outputHandle = None
85
86 if not self._perChromosome:
87 outputHandle = open(self._outputFileName, "wb")
88 for transcript in self._parser.getIterator():
89 progress.inc()
90 if transcript.__class__.__name__ == "Mapping":
91 transcript = transcript.getTranscript()
92 chromosome = transcript.getChromosome()
93 if self._chromosome != None and chromosome != self._chromosome:
94 continue
95 self._nbElements += 1
96 self._nbElementsPerChromosome[chromosome] = self._nbElementsPerChromosome.get(chromosome, 0) + 1
97 if self._perChromosome:
98 if chromosome != curChromosome:
99 if outputHandle != None:
100 outputHandle.close()
101 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome)
102 outputHandle = open(self._outputFileNames[chromosome], "wb")
103 curChromosome = chromosome
104 outputHandle.writelines("%s" % pickle.dumps(transcript))
105 if outputHandle != None:
106 outputHandle.close()
107 progress.done()
108
109 def getNbElements(self):
110 return self._nbElements
111
112 def getNbElementsPerChromosome(self):
113 return self._nbElementsPerChromosome
114
115 def _printSorted(self, chromosome, chunk):
116 chunk.sort(key = lambda transcript: (transcript.getStart(), -transcript.getEnd()))
117 outputChunk = open("%s_%s_%06i.tmp" % (self._prefix, chromosome, len(self._chunks[chromosome])), "wb", 32000)
118 self._chunks[chromosome].append(outputChunk)
119 for transcript in chunk:
120 outputChunk.write(pickle.dumps(transcript, -1))
121 outputChunk.close()
122
123 def _merge(self, chunks):
124 values = []
125 for chunk in chunks:
126 chunk = open(chunk.name, "rb")
127 try:
128 transcript = pickle.load(chunk)
129 start = transcript.getStart()
130 end = -transcript.getEnd()
131 except EOFError:
132 try:
133 chunk.close()
134 chunks.remove(chunk)
135 os.remove(chunk.name)
136 except:
137 pass
138 else:
139 heappush(values, (start, end, transcript, chunk))
140 while values:
141 start, end, transcript, chunk = heappop(values)
142 yield transcript
143 try:
144 transcript = pickle.load(chunk)
145 start = transcript.getStart()
146 end = -transcript.getEnd()
147 except EOFError:
148 try:
149 chunk.close()
150 chunks.remove(chunk)
151 os.remove(chunk.name)
152 except:
153 pass
154 else:
155 heappush(values, (start, end, transcript, chunk))
156
157 def _batchSort(self):
158 currentChunks = {}
159 counts = {}
160 try:
161 progress = UnlimitedProgress(1000, "Sorting file %s" % (self._parser.fileName), self._verbosity)
162 for transcript in self._parser.getIterator():
163 progress.inc()
164 if transcript.__class__.__name__ == "Mapping":
165 transcript = transcript.getTranscript()
166 chromosome = transcript.getChromosome()
167 if self._chromosome != None and chromosome != self._chromosome:
168 continue
169 if chromosome not in self._chunks:
170 self._chunks[chromosome] = []
171 currentChunks[chromosome] = []
172 counts[chromosome] = 0
173 currentChunks[chromosome].append(transcript)
174 counts[chromosome] += 1
175 if counts[chromosome] == BUFFER_SIZE:
176 self._printSorted(chromosome, currentChunks[chromosome])
177 currentChunks[chromosome] = []
178 counts[chromosome] = 0
179 self._nbElements += 1
180 self._nbElementsPerChromosome[chromosome] = self._nbElementsPerChromosome.get(chromosome, 0) + 1
181 for chromosome in self._chunks:
182 if counts[chromosome] > 0:
183 self._printSorted(chromosome, currentChunks[chromosome])
184 progress.done()
185 if not self._perChromosome:
186 outputHandle = open(self._outputFileName, "wb")
187 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity)
188 for chromosome in self._chunks:
189 if self._perChromosome:
190 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome)
191 outputHandle = open(self._outputFileNames[chromosome], "wb")
192 for sequence in self._merge(self._chunks[chromosome]):
193 pickle.dump(sequence, outputHandle, -1)
194 if self._perChromosome:
195 outputHandle.close()
196 progress.inc()
197 if not self._perChromosome:
198 outputHandle.close()
199 progress.done()
200 finally:
201 for chunks in self._chunks.values():
202 for chunk in chunks:
203 try:
204 chunk.close()
205 os.remove(chunk.name)
206 except Exception:
207 pass
208
209 def getOutputFileNames(self):
210 return self._outputFileNames