6
|
1 #! /usr/bin/env python
|
|
2 #
|
|
3 # Copyright INRA-URGI 2009-2010
|
|
4 #
|
|
5 # This software is governed by the CeCILL license under French law and
|
|
6 # abiding by the rules of distribution of free software. You can use,
|
|
7 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
8 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
9 # "http://www.cecill.info".
|
|
10 #
|
|
11 # As a counterpart to the access to the source code and rights to copy,
|
|
12 # modify and redistribute granted by the license, users are provided only
|
|
13 # with a limited warranty and the software's author, the holder of the
|
|
14 # economic rights, and the successive licensors have only limited
|
|
15 # liability.
|
|
16 #
|
|
17 # In this respect, the user's attention is drawn to the risks associated
|
|
18 # with loading, using, modifying and/or developing or reproducing the
|
|
19 # software by the user in light of its specific status of free software,
|
|
20 # that may mean that it is complicated to manipulate, and that also
|
|
21 # therefore means that it is reserved for developers and experienced
|
|
22 # professionals having in-depth computer knowledge. Users are therefore
|
|
23 # encouraged to load and test the software's suitability as regards their
|
|
24 # requirements in conditions enabling the security of their systems and/or
|
|
25 # data to be ensured and, more generally, to use and operate it in the
|
|
26 # same conditions as regards security.
|
|
27 #
|
|
28 # The fact that you are presently reading this means that you have had
|
|
29 # knowledge of the CeCILL license and that you accept its terms.
|
|
30 #
|
|
31
|
|
32 try:
|
|
33 import cPickle as pickle
|
|
34 except:
|
|
35 import pickle
|
|
36 import random, os
|
|
37 from heapq import heapify, heappop, heappush
|
|
38 from itertools import islice, cycle
|
|
39 from SMART.Java.Python.structure.Transcript import Transcript
|
|
40 from SMART.Java.Python.misc.Progress import Progress
|
|
41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress
|
|
42
|
|
43 BUFFER_SIZE = 100 * 1024
|
68
|
44 NB_FILES = 10000000000
|
6
|
45
|
|
46 class FileSorter(object):
|
|
47
|
|
48 def __init__(self, parser, verbosity = 1):
|
|
49 self._parser = parser
|
|
50 self._verbosity = verbosity
|
|
51 self._chunks = {}
|
|
52 self._nbElements = 0
|
|
53 self._nbElementsPerChromosome = {}
|
|
54 self._perChromosome = False
|
|
55 self._isPreSorted = False
|
|
56 self._outputFileNames = {}
|
|
57 self._prefix = "tmpFile_%d" % (random.randint(0, 100000))
|
|
58 self._chromosome = None
|
|
59 if "SMARTTMPPATH" in os.environ:
|
|
60 self._prefix = os.path.join(os.environ["SMARTTMPPATH"], self._prefix)
|
|
61
|
|
62 def selectChromosome(self, chromosome):
|
|
63 self._chromosome = chromosome
|
|
64
|
|
65 def perChromosome(self, boolean):
|
|
66 self._perChromosome = boolean
|
|
67
|
|
68 def setOutputFileName(self, fileName):
|
|
69 self._outputFileName = fileName
|
|
70 if self._perChromosome:
|
|
71 self._outputFileName = os.path.splitext(self._outputFileName)[0]
|
|
72
|
|
73 def setPresorted(self, presorted):
|
|
74 self._isPreSorted = presorted
|
|
75
|
|
76 def sort(self):
|
|
77 if not self._isPreSorted:
|
|
78 self._batchSort()
|
|
79 else:
|
|
80 self._presorted()
|
|
81
|
|
82 def _presorted(self):
|
|
83 progress = UnlimitedProgress(1000, "Writing files %s" % (self._parser.fileName), self._verbosity)
|
|
84 curChromosome = None
|
|
85 outputHandle = None
|
|
86
|
|
87 if not self._perChromosome:
|
|
88 outputHandle = open(self._outputFileName, "wb")
|
|
89 for transcript in self._parser.getIterator():
|
|
90 progress.inc()
|
|
91 if transcript.__class__.__name__ == "Mapping":
|
|
92 transcript = transcript.getTranscript()
|
|
93 chromosome = transcript.getChromosome()
|
|
94 if self._chromosome != None and chromosome != self._chromosome:
|
|
95 continue
|
|
96 self._nbElements += 1
|
|
97 self._nbElementsPerChromosome[chromosome] = self._nbElementsPerChromosome.get(chromosome, 0) + 1
|
|
98 if self._perChromosome:
|
|
99 if chromosome != curChromosome:
|
|
100 if outputHandle != None:
|
|
101 outputHandle.close()
|
|
102 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome)
|
|
103 outputHandle = open(self._outputFileNames[chromosome], "wb")
|
|
104 curChromosome = chromosome
|
|
105 outputHandle.writelines("%s" % pickle.dumps(transcript))
|
|
106 if outputHandle != None:
|
|
107 outputHandle.close()
|
|
108 progress.done()
|
|
109
|
|
110 def getNbElements(self):
|
|
111 return self._nbElements
|
|
112
|
|
113 def getNbElementsPerChromosome(self):
|
|
114 return self._nbElementsPerChromosome
|
|
115
|
|
116 def _printSorted(self, chromosome, chunk):
|
|
117 chunk.sort(key = lambda transcript: (transcript.getStart(), -transcript.getEnd()))
|
|
118 outputChunk = open("%s_%s_%06i.tmp" % (self._prefix, chromosome, len(self._chunks[chromosome])), "wb", 32000)
|
|
119 self._chunks[chromosome].append(outputChunk)
|
|
120 for transcript in chunk:
|
|
121 outputChunk.write(pickle.dumps(transcript, -1))
|
|
122 outputChunk.close()
|
|
123
|
68
|
124 def _merge(self, chunks, chromosome, outputHandle):
|
|
125 currentOutputChunkId = len(chunks)
|
|
126 while len(chunks) > NB_FILES:
|
|
127 outputChunk = open("%s_%s_%06i.tmp" % (self._prefix, chromosome, currentChunkId), "wb", 32000)
|
|
128 currentOutputChunkId += 1
|
|
129 currentChunks = chunks[:NB_FILES]
|
|
130 chunks = chunks[NB_FILES:] + outputChunk
|
|
131 self._mergeParts(currentChunks, outputChunk)
|
|
132 outputChunk.close()
|
|
133 self._mergeParts(chunks, outputHandle)
|
|
134
|
|
135 def _mergeParts(self, chunks, outputHandle):
|
6
|
136 values = []
|
|
137 for chunk in chunks:
|
|
138 chunk = open(chunk.name, "rb")
|
|
139 try:
|
|
140 transcript = pickle.load(chunk)
|
|
141 start = transcript.getStart()
|
|
142 end = -transcript.getEnd()
|
|
143 except EOFError:
|
|
144 try:
|
|
145 chunk.close()
|
|
146 chunks.remove(chunk)
|
|
147 os.remove(chunk.name)
|
|
148 except:
|
|
149 pass
|
|
150 else:
|
|
151 heappush(values, (start, end, transcript, chunk))
|
|
152 while values:
|
68
|
153 start, end, transcript, chUnk = heappop(values)
|
|
154 pickle.dump(transcript, outputHandle, -1)
|
6
|
155 try:
|
|
156 transcript = pickle.load(chunk)
|
|
157 start = transcript.getStart()
|
|
158 end = -transcript.getEnd()
|
|
159 except EOFError:
|
|
160 try:
|
|
161 chunk.close()
|
|
162 chunks.remove(chunk)
|
|
163 os.remove(chunk.name)
|
|
164 except:
|
|
165 pass
|
|
166 else:
|
|
167 heappush(values, (start, end, transcript, chunk))
|
|
168
|
|
169 def _batchSort(self):
|
|
170 currentChunks = {}
|
|
171 counts = {}
|
|
172 try:
|
|
173 progress = UnlimitedProgress(1000, "Sorting file %s" % (self._parser.fileName), self._verbosity)
|
|
174 for transcript in self._parser.getIterator():
|
|
175 progress.inc()
|
|
176 if transcript.__class__.__name__ == "Mapping":
|
|
177 transcript = transcript.getTranscript()
|
|
178 chromosome = transcript.getChromosome()
|
|
179 if self._chromosome != None and chromosome != self._chromosome:
|
|
180 continue
|
|
181 if chromosome not in self._chunks:
|
|
182 self._chunks[chromosome] = []
|
|
183 currentChunks[chromosome] = []
|
|
184 counts[chromosome] = 0
|
|
185 currentChunks[chromosome].append(transcript)
|
|
186 counts[chromosome] += 1
|
|
187 if counts[chromosome] == BUFFER_SIZE:
|
|
188 self._printSorted(chromosome, currentChunks[chromosome])
|
|
189 currentChunks[chromosome] = []
|
|
190 counts[chromosome] = 0
|
|
191 self._nbElements += 1
|
|
192 self._nbElementsPerChromosome[chromosome] = self._nbElementsPerChromosome.get(chromosome, 0) + 1
|
|
193 for chromosome in self._chunks:
|
|
194 if counts[chromosome] > 0:
|
|
195 self._printSorted(chromosome, currentChunks[chromosome])
|
|
196 progress.done()
|
|
197 if not self._perChromosome:
|
|
198 outputHandle = open(self._outputFileName, "wb")
|
|
199 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity)
|
|
200 for chromosome in self._chunks:
|
|
201 if self._perChromosome:
|
|
202 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome)
|
|
203 outputHandle = open(self._outputFileNames[chromosome], "wb")
|
68
|
204 self._merge(self._chunks[chromosome], chromosome, outputHandle)
|
6
|
205 if self._perChromosome:
|
|
206 outputHandle.close()
|
|
207 progress.inc()
|
|
208 if not self._perChromosome:
|
|
209 outputHandle.close()
|
|
210 progress.done()
|
|
211 finally:
|
|
212 for chunks in self._chunks.values():
|
|
213 for chunk in chunks:
|
|
214 try:
|
|
215 chunk.close()
|
|
216 os.remove(chunk.name)
|
|
217 except Exception:
|
|
218 pass
|
|
219
|
|
220 def getOutputFileNames(self):
|
|
221 return self._outputFileNames
|