Mercurial > repos > yufei-luo > s_mart
comparison SMART/Java/Python/ncList/FileSorter.py @ 68:85e80c21b1f7 draft
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 16 Nov 2015 12:00:32 -0500 |
parents | 769e306b7933 |
children |
comparison
equal
deleted
inserted
replaced
67:f4de72c80eac | 68:85e80c21b1f7 |
---|---|
39 from SMART.Java.Python.structure.Transcript import Transcript | 39 from SMART.Java.Python.structure.Transcript import Transcript |
40 from SMART.Java.Python.misc.Progress import Progress | 40 from SMART.Java.Python.misc.Progress import Progress |
41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress | 41 from SMART.Java.Python.misc.UnlimitedProgress import UnlimitedProgress |
42 | 42 |
43 BUFFER_SIZE = 100 * 1024 | 43 BUFFER_SIZE = 100 * 1024 |
44 NB_FILES = 10000000000 | |
44 | 45 |
45 class FileSorter(object): | 46 class FileSorter(object): |
46 | 47 |
47 def __init__(self, parser, verbosity = 1): | 48 def __init__(self, parser, verbosity = 1): |
48 self._parser = parser | 49 self._parser = parser |
118 self._chunks[chromosome].append(outputChunk) | 119 self._chunks[chromosome].append(outputChunk) |
119 for transcript in chunk: | 120 for transcript in chunk: |
120 outputChunk.write(pickle.dumps(transcript, -1)) | 121 outputChunk.write(pickle.dumps(transcript, -1)) |
121 outputChunk.close() | 122 outputChunk.close() |
122 | 123 |
123 def _merge(self, chunks): | 124 def _merge(self, chunks, chromosome, outputHandle): |
125 currentOutputChunkId = len(chunks) | |
126 while len(chunks) > NB_FILES: | |
127 outputChunk = open("%s_%s_%06i.tmp" % (self._prefix, chromosome, currentChunkId), "wb", 32000) | |
128 currentOutputChunkId += 1 | |
129 currentChunks = chunks[:NB_FILES] | |
130 chunks = chunks[NB_FILES:] + outputChunk | |
131 self._mergeParts(currentChunks, outputChunk) | |
132 outputChunk.close() | |
133 self._mergeParts(chunks, outputHandle) | |
134 | |
135 def _mergeParts(self, chunks, outputHandle): | |
124 values = [] | 136 values = [] |
125 for chunk in chunks: | 137 for chunk in chunks: |
126 chunk = open(chunk.name, "rb") | 138 chunk = open(chunk.name, "rb") |
127 try: | 139 try: |
128 transcript = pickle.load(chunk) | 140 transcript = pickle.load(chunk) |
136 except: | 148 except: |
137 pass | 149 pass |
138 else: | 150 else: |
139 heappush(values, (start, end, transcript, chunk)) | 151 heappush(values, (start, end, transcript, chunk)) |
140 while values: | 152 while values: |
141 start, end, transcript, chunk = heappop(values) | 153 start, end, transcript, chUnk = heappop(values) |
142 yield transcript | 154 pickle.dump(transcript, outputHandle, -1) |
143 try: | 155 try: |
144 transcript = pickle.load(chunk) | 156 transcript = pickle.load(chunk) |
145 start = transcript.getStart() | 157 start = transcript.getStart() |
146 end = -transcript.getEnd() | 158 end = -transcript.getEnd() |
147 except EOFError: | 159 except EOFError: |
187 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity) | 199 progress = Progress(len(self._chunks), "Writing sorted file %s" % (self._parser.fileName), self._verbosity) |
188 for chromosome in self._chunks: | 200 for chromosome in self._chunks: |
189 if self._perChromosome: | 201 if self._perChromosome: |
190 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome) | 202 self._outputFileNames[chromosome] = "%s_%s.pkl" % (self._outputFileName, chromosome) |
191 outputHandle = open(self._outputFileNames[chromosome], "wb") | 203 outputHandle = open(self._outputFileNames[chromosome], "wb") |
192 for sequence in self._merge(self._chunks[chromosome]): | 204 self._merge(self._chunks[chromosome], chromosome, outputHandle) |
193 pickle.dump(sequence, outputHandle, -1) | |
194 if self._perChromosome: | 205 if self._perChromosome: |
195 outputHandle.close() | 206 outputHandle.close() |
196 progress.inc() | 207 progress.inc() |
197 if not self._perChromosome: | 208 if not self._perChromosome: |
198 outputHandle.close() | 209 outputHandle.close() |