comparison SMART/Java/Python/ncList/FileSorter.py @ 68:85e80c21b1f7 draft

Uploaded
author m-zytnicki
date Mon, 16 Nov 2015 12:00:32 -0500
parents 769e306b7933
children
comparison
equal deleted inserted replaced
67:f4de72c80eac 68:85e80c21b1f7
39 from SMART.Java.Python.structure.Transcript import Transcript 39 from SMART.Java.Python.structure.Transcript import Transcript
40 from SMART.Java.Python.misc.Progress import Progress 40 from SMART.Java.Python.misc.Progress import Progress
41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress 41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress
42 42
43 BUFFER_SIZE = 100 * 1024 43 BUFFER_SIZE = 100 * 1024
44 NB_FILES = 10000000000
44 45
45 class FileSorter(object): 46 class FileSorter(object):
46 47
47 def __init__(self, parser, verbosity = 1): 48 def __init__(self, parser, verbosity = 1):
48 self._parser = parser 49 self._parser = parser
118 self._chunks[chromosome].append(outputChunk) 119 self._chunks[chromosome].append(outputChunk)
119 for transcript in chunk: 120 for transcript in chunk:
120 outputChunk.write(pickle.dumps(transcript, -1)) 121 outputChunk.write(pickle.dumps(transcript, -1))
121 outputChunk.close() 122 outputChunk.close()
122 123
123 def _merge(self, chunks): 124 def _merge(self, chunks, chromosome, outputHandle):
125 currentOutputChunkId = len(chunks)
126 while len(chunks) > NB_FILES:
127 outputChunk = open("%s_%s_%06i.tmp" % (self._prefix, chromosome, currentChunkId), "wb", 32000)
128 currentOutputChunkId += 1
129 currentChunks = chunks[:NB_FILES]
130 chunks = chunks[NB_FILES:] + outputChunk
131 self._mergeParts(currentChunks, outputChunk)
132 outputChunk.close()
133 self._mergeParts(chunks, outputHandle)
134
135 def _mergeParts(self, chunks, outputHandle):
124 values = [] 136 values = []
125 for chunk in chunks: 137 for chunk in chunks:
126 chunk = open(chunk.name, "rb") 138 chunk = open(chunk.name, "rb")
127 try: 139 try:
128 transcript = pickle.load(chunk) 140 transcript = pickle.load(chunk)
136 except: 148 except:
137 pass 149 pass
138 else: 150 else:
139 heappush(values, (start, end, transcript, chunk)) 151 heappush(values, (start, end, transcript, chunk))
140 while values: 152 while values:
141 start, end, transcript, chunk = heappop(values) 153 start, end, transcript, chUnk = heappop(values)
142 yield transcript 154 pickle.dump(transcript, outputHandle, -1)
143 try: 155 try:
144 transcript = pickle.load(chunk) 156 transcript = pickle.load(chunk)
145 start = transcript.getStart() 157 start = transcript.getStart()
146 end = -transcript.getEnd() 158 end = -transcript.getEnd()
147 except EOFError: 159 except EOFError:
187 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity) 199 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity)
188 for chromosome in self._chunks: 200 for chromosome in self._chunks:
189 if self._perChromosome: 201 if self._perChromosome:
190 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome) 202 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome)
191 outputHandle = open(self._outputFileNames[chromosome], "wb") 203 outputHandle = open(self._outputFileNames[chromosome], "wb")
192 for sequence in self._merge(self._chunks[chromosome]): 204 self._merge(self._chunks[chromosome], chromosome, outputHandle)
193 pickle.dump(sequence, outputHandle, -1)
194 if self._perChromosome: 205 if self._perChromosome:
195 outputHandle.close() 206 outputHandle.close()
196 progress.inc() 207 progress.inc()
197 if not self._perChromosome: 208 if not self._perChromosome:
198 outputHandle.close() 209 outputHandle.close()