1 #!/usr/bin/env python
3 # Copyright INRA (Institut National de la Recherche Agronomique)
4 # http://www.inra.fr
5 # http://urgi.versailles.inra.fr
6 #
7 # This software is governed by the CeCILL license under French law and
8 # abiding by the rules of distribution of free software. You can use,
9 # modify and/ or redistribute the software under the terms of the CeCILL
10 # license as circulated by CEA, CNRS and INRIA at the following URL
11 # "http://www.cecill.info".
12 #
13 # As a counterpart to the access to the source code and rights to copy,
14 # modify and redistribute granted by the license, users are provided only
15 # with a limited warranty and the software's author, the holder of the
16 # economic rights, and the successive licensors have only limited
17 # liability.
18 #
19 # In this respect, the user's attention is drawn to the risks associated
20 # with loading, using, modifying and/or developing or reproducing the
21 # software by the user in light of its specific status of free software,
22 # that may mean that it is complicated to manipulate, and that also
23 # therefore means that it is reserved for developers and experienced
24 # professionals having in-depth computer knowledge. Users are therefore
25 # encouraged to load and test the software's suitability as regards their
26 # requirements in conditions enabling the security of their systems and/or
27 # data to be ensured and, more generally, to use and operate it in the
28 # same conditions as regards security.
29 #
30 # The fact that you are presently reading this means that you have had
31 # knowledge of the CeCILL license and that you accept its terms.
33 import os
34 import time
35 import shutil
36 from commons.core.seq.BioseqDB import BioseqDB
37 from commons.core.seq.SequenceModifications import SequenceModifications
38 from commons.core.checker.RepetException import RepetException
40 class SequenceModificationsCollection(object):
42 def __init__(self):
43 self._lSeqModif = []
45 def __str__(self):
46 result = ""
47 for iSeqModif in self._lSeqModif:
48 result += "%s\n" % iSeqModif.__str__()
49 return result
51 def __eq__(self, o):
52 if type(o) is type(self):
53 self.sort()
54 o.sort()
55 return self._lSeqModif == o._lSeqModif
56 return False
58 def __ne__(self, o):
59 return not self.__eq__(o)
61 def clear(self):
62 self._lSeqModif = []
64 def add(self, iSeqModif, override = False):
65 for seqModif in self._lSeqModif:
66 if seqModif.getOriginalHeader() == iSeqModif.getOriginalHeader():
67 if override:
68 self._lSeqModif.pop(self._lSeqModif.index(seqModif))
69 else:
70 raise RepetException("ERROR: '%s' already in SequenceModificationsCollection" % iSeqModif.getOriginalHeader())
72 self._lSeqModif.append(iSeqModif)
74 def get(self, header, mutated = False):
75 for iSeqModif in self._lSeqModif:
76 if mutated:
77 linkToGoodMethod = iSeqModif.getMutatedHeader
78 else:
79 linkToGoodMethod = iSeqModif.getOriginalHeader
81 if linkToGoodMethod() == header:
82 return iSeqModif
83 return None
85 def getHeadersList(self, mutated = False):
86 lHeaders = []
87 if mutated:
88 for iSeqModif in self._lSeqModif:
89 lHeaders.append(iSeqModif.getMutatedHeader())
90 else:
91 for iSeqModif in self._lSeqModif:
92 lHeaders.append(iSeqModif.getOriginalHeader())
93 lHeaders.sort(key = lambda header: header.lower())
94 return lHeaders
96 def sort(self):
97 self._lSeqModif.sort(key = lambda seqMod: seqMod.getOriginalHeader().lower(), reverse = False)
99 def writeMutations(self, fileName, outFormat = ""):
100 self.sort()
101 with open(fileName, "w") as fH:
102 if outFormat.lower() in ["gff", "gff3"]:
103 fH.write("##gff-version 3\n")
104 for iSeqModif in self._lSeqModif:
105 for mutation in iSeqModif.getMutations():
106 pos = mutation[0]
107 old = mutation[1]
108 new = mutation[2]
109 fH.write("%s\tMutateSequence\tSNP\t%i\t%i\t.\t.\t.\tName=SNP_%i;REF=%s;ALT=%s\n" % (iSeqModif.getOriginalHeader(), pos, pos, pos, old, new))
110 else:
111 fH.write("#Mutations:\n")
112 fH.write("seqName\tposition\toldNt\tnewNt\n")
113 for iSeqModif in self._lSeqModif:
114 for mutation in iSeqModif.getMutations():
115 fH.write("%s\t%i\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), mutation[0], mutation[1], mutation[2]))
117 def writeInsertions(self, fileName, outFormat = ""):
118 self.sort()
119 with open(fileName, "w") as fH:
120 if outFormat.lower() in ["gff", "gff3"]:
121 fH.write("##gff-version 3\n")
122 for iSeqModif in self._lSeqModif:
123 for iRange in iSeqModif.getInsertions():
124 if iRange.getSeqname() != ".":
125 fH.write("%s\tMutateSequence\tinsertion\t%s\t%s\t.\t.\t.\tName=insertion_%s-%s;insert=%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd(), iRange.getSeqname()))
126 else:
127 fH.write("%s\tMutateSequence\tinsertion\t%s\t%s\t.\t.\t.\tName=insertion_%s-%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd()))
128 else:
129 fH.write("#Insertions:\n")
130 fH.write("seqName\tstart\tend\tinsertedSeqName\n")
131 for iSeqModif in self._lSeqModif:
132 for iRange in iSeqModif.getInsertions():
133 fH.write("%s\t%i\t%i\t%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getSeqname()))
135 def writeDeletions(self, fileName, outFormat = ""):
136 self.sort()
137 with open(fileName, "w") as fH:
138 if outFormat.lower() in ["gff", "gff3"]:
139 fH.write("##gff-version 3\n")
140 for iSeqModif in self._lSeqModif:
141 for iRange in iSeqModif.getDeletions():
142 fH.write("%s\tMutateSequence\tdeletion\t%s\t%s\t.\t.\t.\tName=deletion_%s-%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd()))
143 else:
144 fH.write("#Deletions:\n")
145 fH.write("seqName\tstart\tend\n")
146 for iSeqModif in self._lSeqModif:
147 for iRange in iSeqModif.getDeletions():
148 fH.write("%s\t%i\t%i\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd()))
150 def write(self, mutationsFileName = "", insertionsFileName = "", deletionsFileName = "", outFormat = ""):
151 self.sort()
152 self.writeMutations(mutationsFileName, outFormat)
153 self.writeInsertions(insertionsFileName, outFormat)
154 self.writeDeletions(deletionsFileName, outFormat)
156 def writeVCF(self, VCFFileName, fastaFileName, software = "MutateSequences"):
157 self.sort()
158 tmpVCFFileName = "%s.tmp" % VCFFileName
159 VCFFH = open(tmpVCFFileName, "w")
160 VCFFH.write("##fileformat=VCFv4.1\n")
161 VCFFH.write("##fileDate=%s\n" % time.strftime("%Y%m%d"))
162 VCFFH.write("##reference=%s\n" % os.path.abspath(fastaFileName))
163 VCFFH.write("##INFO=<ID=SVLEN,Number=.,Type=Integer,Description=\"Difference in length between REF and ALT alleles\">\n")
164 VCFFH.write("##INFO=<ID=SVTYPE,Number=1,Type=String,Description=\"Type of structural variant\">\n")
165 VCFFH.write("##INFO=<ID=ALTSTART,Number=1,Type=Integer,Description=\"ALT start position on query sequence\">\n")
166 VCFFH.write("##INFO=<ID=SOFTWARE,Number=1,Type=String,Description=\"Software used to generate this VCF\">\n")
167 VCFFH.write("##INFO=<ID=INSERTED,Number=1,Type=String,Description=\"Inserted sequence name\">\n")
170 iBSDB = BioseqDB(fastaFileName)
172 for iSeqModif in self._lSeqModif:
173 for mutation in iSeqModif.getMutations():
174 pos = mutation[0]
175 old = mutation[1]
176 new = mutation[2]
177 VCFFH.write("%s\t%s\t.\t%s\t%s\t.\t.\tAN=2;REF=%s;ALT=%s;SOFTWARE=%s\n" % (iSeqModif.getOriginalHeader(), pos, old, new, old, new, software))
179 for insRange in iSeqModif.getInsertions():
180 if insRange.getStart() != 1:
181 refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).getNtFromPosition(insRange.getStart() - 1)
182 altSeq = "."
184 INFO = "SVTYPE=INS;AN=2;SVLEN=%d;SOFTWARE=%s" % (insRange.getEnd() - insRange.getStart() + 1, software)
185 if insRange.getSeqname() != ".":
186 INFO += ";INSERTED=%s" % insRange.getSeqname()
187 VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), insRange.getStart() - 1, refSeq, altSeq, ".", ".", INFO)
189 else:
190 refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).getNtFromPosition(insRange.getStart())
191 refSeq = "."
192 altSeq = "."
194 INFO = "SVTYPE=INS;AN=2;SVLEN=%d;SOFTWARE=%s" % (insRange.getEnd() - insRange.getStart() + 1, software)
195 if insRange.getSeqname() != ".":
196 INFO += ";INSERTED=%s" % insRange.getSeqname()
197 VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), insRange.getStart(), refSeq, altSeq, ".", ".", INFO)
199 VCFFH.write(VCFLine)
201 for delRange in iSeqModif.getDeletions():
202 if delRange.getStart() != 1:
203 refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).subseq(delRange.getStart() - 1, delRange.getEnd()).getSequence()
204 altSeq = refSeq[0]
206 INFO = "SVTYPE=DEL;AN=2;SVLEN=-%d;SOFTWARE=%s" % (len(refSeq)-1, software)
207 VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), delRange.getStart() - 1, refSeq, altSeq, ".", ".", INFO)
209 else:
210 refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).subseq(delRange.getStart(), delRange.getEnd() + 1).getSequence()
211 altSeq = refSeq[-1]
212 altSeq = "."
214 INFO = "SVTYPE=DEL;AN=2;SVLEN=-%d;SOFTWARE=%s" % (len(refSeq)-1, software)
215 VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), delRange.getStart(), refSeq, altSeq, ".", ".", INFO)
217 VCFFH.write(VCFLine)
219 #This command line can sort this VCF file properly. But can't manage to launch it properly through os.system or subprocess...
220 # cmd = "(head -n 9 %s && tail -n +10 %s | head -n -1 | sort -f -k1,1 -k2,2n) > %s" % (tmpVCFFileName, tmpVCFFileName, VCFFileName)
221 shutil.move(tmpVCFFileName, VCFFileName)
223 def getCollectionBasedOnMutatedSequence(self):
224 transformedSeqModifCollec = SequenceModificationsCollection()
226 for header in self.getHeadersList():
227 currentSeqModif = self.get(header)
229 lModifsTuples = [("insertion", iRange) for iRange in currentSeqModif.getInsertions()]
230 for iRange in currentSeqModif.getDeletions():
231 lModifsTuples.append(("deletion", iRange))
232 lModifsTuples.sort(key = lambda modifTuple: modifTuple[1].getStart(), reverse = False)
234 sumIns = 0
235 sumDel = 0
237 iseqModif = SequenceModifications(currentSeqModif.getMutatedHeader(), currentSeqModif.getOriginalHeader())
238 for modifTuple in lModifsTuples:
239 varType = modifTuple[0]
240 varRange = modifTuple[1]
242 if varType == "insertion":
243 iseqModif.addDeletion(varRange.getStart() + sumIns - sumDel, varRange.getEnd() + sumIns - sumDel)
244 sumIns += varRange.getLength()
246 if varType == "deletion":
247 iseqModif.addInsertion(varRange.getStart() + sumIns - sumDel, varRange.getEnd() + sumIns - sumDel)
248 sumDel += varRange.getLength()
250 for tSnp in currentSeqModif.getMutations():
251 iseqModif.addMutation((tSnp[0], tSnp[2], tSnp[1]))
253 iseqModif.sort()
254 transformedSeqModifCollec.add(iseqModif)
256 transformedSeqModifCollec.sort()
258 return transformedSeqModifCollec
260 def loadSeqModifCollectionFromFiles(self, inInsertionsFileName, inDeletionsFileName, inSNPsFileName, SNPsrate = "0.020000"):
261 self.clear()
263 with open(inInsertionsFileName, "r") as f:
264 line = f.readline()
265 while line:
266 if "seqName" not in line and "#" not in line:
267 splittedLine = line.split()
268 seqname = splittedLine[0]
269 start = int(splittedLine[1])
270 end = int(splittedLine[2])
271 insertedSeqName = splittedLine[3]
273 if self.get(seqname) is None:
274 self.add(SequenceModifications(seqname))
275 self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate))
276 self.get(seqname).addInsertion(start, end, insertedSeqName)
277 line = f.readline()
279 with open(inDeletionsFileName, "r") as f:
280 line = f.readline()
281 while line:
282 if "seqName" not in line and "#" not in line:
283 splittedLine = line.split()
284 seqname = splittedLine[0]
285 start = int(splittedLine[1])
286 end = int(splittedLine[2])
288 if self.get(seqname) is None:
289 self.add(SequenceModifications(seqname))
290 self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate))
291 self.get(seqname).addDeletion(start, end)
292 line = f.readline()
294 with open(inSNPsFileName, "r") as f:
295 line = f.readline()
296 while line:
297 if "seqName" not in line and "#" not in line:
298 splittedLine = line.split()
299 seqname = splittedLine[0]
300 position = int(splittedLine[1])
301 oldNt = splittedLine[2]
302 newNt = splittedLine[3]
304 if self.get(seqname) is None:
305 self.add(SequenceModifications(seqname))
306 self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate))
307 self.get(seqname).addMutation((position, oldNt, newNt))
308 line = f.readline()
310 for header in self.getHeadersList():
311 self.get(header).sort()
312 self.sort() |