Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/bleach/_vendor/html5lib/_inputstream.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 from __future__ import absolute_import, division, unicode_literals | |
2 | |
3 from six import text_type | |
4 from six.moves import http_client, urllib | |
5 | |
6 import codecs | |
7 import re | |
8 from io import BytesIO, StringIO | |
9 | |
10 import webencodings | |
11 | |
12 from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase | |
13 from .constants import _ReparseException | |
14 from . import _utils | |
15 | |
16 # Non-unicode versions of constants for use in the pre-parser | |
17 spaceCharactersBytes = frozenset([item.encode("ascii") for item in spaceCharacters]) | |
18 asciiLettersBytes = frozenset([item.encode("ascii") for item in asciiLetters]) | |
19 asciiUppercaseBytes = frozenset([item.encode("ascii") for item in asciiUppercase]) | |
20 spacesAngleBrackets = spaceCharactersBytes | frozenset([b">", b"<"]) | |
21 | |
22 | |
23 invalid_unicode_no_surrogate = "[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]" # noqa | |
24 | |
25 if _utils.supports_lone_surrogates: | |
26 # Use one extra step of indirection and create surrogates with | |
27 # eval. Not using this indirection would introduce an illegal | |
28 # unicode literal on platforms not supporting such lone | |
29 # surrogates. | |
30 assert invalid_unicode_no_surrogate[-1] == "]" and invalid_unicode_no_surrogate.count("]") == 1 | |
31 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate[:-1] + | |
32 eval('"\\uD800-\\uDFFF"') + # pylint:disable=eval-used | |
33 "]") | |
34 else: | |
35 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate) | |
36 | |
37 non_bmp_invalid_codepoints = {0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE, | |
38 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF, | |
39 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE, | |
40 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF, | |
41 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE, | |
42 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF, | |
43 0x10FFFE, 0x10FFFF} | |
44 | |
45 ascii_punctuation_re = re.compile("[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005C\u005B-\u0060\u007B-\u007E]") | |
46 | |
47 # Cache for charsUntil() | |
48 charsUntilRegEx = {} | |
49 | |
50 | |
51 class BufferedStream(object): | |
52 """Buffering for streams that do not have buffering of their own | |
53 | |
54 The buffer is implemented as a list of chunks on the assumption that | |
55 joining many strings will be slow since it is O(n**2) | |
56 """ | |
57 | |
58 def __init__(self, stream): | |
59 self.stream = stream | |
60 self.buffer = [] | |
61 self.position = [-1, 0] # chunk number, offset | |
62 | |
63 def tell(self): | |
64 pos = 0 | |
65 for chunk in self.buffer[:self.position[0]]: | |
66 pos += len(chunk) | |
67 pos += self.position[1] | |
68 return pos | |
69 | |
70 def seek(self, pos): | |
71 assert pos <= self._bufferedBytes() | |
72 offset = pos | |
73 i = 0 | |
74 while len(self.buffer[i]) < offset: | |
75 offset -= len(self.buffer[i]) | |
76 i += 1 | |
77 self.position = [i, offset] | |
78 | |
79 def read(self, bytes): | |
80 if not self.buffer: | |
81 return self._readStream(bytes) | |
82 elif (self.position[0] == len(self.buffer) and | |
83 self.position[1] == len(self.buffer[-1])): | |
84 return self._readStream(bytes) | |
85 else: | |
86 return self._readFromBuffer(bytes) | |
87 | |
88 def _bufferedBytes(self): | |
89 return sum([len(item) for item in self.buffer]) | |
90 | |
91 def _readStream(self, bytes): | |
92 data = self.stream.read(bytes) | |
93 self.buffer.append(data) | |
94 self.position[0] += 1 | |
95 self.position[1] = len(data) | |
96 return data | |
97 | |
98 def _readFromBuffer(self, bytes): | |
99 remainingBytes = bytes | |
100 rv = [] | |
101 bufferIndex = self.position[0] | |
102 bufferOffset = self.position[1] | |
103 while bufferIndex < len(self.buffer) and remainingBytes != 0: | |
104 assert remainingBytes > 0 | |
105 bufferedData = self.buffer[bufferIndex] | |
106 | |
107 if remainingBytes <= len(bufferedData) - bufferOffset: | |
108 bytesToRead = remainingBytes | |
109 self.position = [bufferIndex, bufferOffset + bytesToRead] | |
110 else: | |
111 bytesToRead = len(bufferedData) - bufferOffset | |
112 self.position = [bufferIndex, len(bufferedData)] | |
113 bufferIndex += 1 | |
114 rv.append(bufferedData[bufferOffset:bufferOffset + bytesToRead]) | |
115 remainingBytes -= bytesToRead | |
116 | |
117 bufferOffset = 0 | |
118 | |
119 if remainingBytes: | |
120 rv.append(self._readStream(remainingBytes)) | |
121 | |
122 return b"".join(rv) | |
123 | |
124 | |
125 def HTMLInputStream(source, **kwargs): | |
126 # Work around Python bug #20007: read(0) closes the connection. | |
127 # http://bugs.python.org/issue20007 | |
128 if (isinstance(source, http_client.HTTPResponse) or | |
129 # Also check for addinfourl wrapping HTTPResponse | |
130 (isinstance(source, urllib.response.addbase) and | |
131 isinstance(source.fp, http_client.HTTPResponse))): | |
132 isUnicode = False | |
133 elif hasattr(source, "read"): | |
134 isUnicode = isinstance(source.read(0), text_type) | |
135 else: | |
136 isUnicode = isinstance(source, text_type) | |
137 | |
138 if isUnicode: | |
139 encodings = [x for x in kwargs if x.endswith("_encoding")] | |
140 if encodings: | |
141 raise TypeError("Cannot set an encoding with a unicode input, set %r" % encodings) | |
142 | |
143 return HTMLUnicodeInputStream(source, **kwargs) | |
144 else: | |
145 return HTMLBinaryInputStream(source, **kwargs) | |
146 | |
147 | |
148 class HTMLUnicodeInputStream(object): | |
149 """Provides a unicode stream of characters to the HTMLTokenizer. | |
150 | |
151 This class takes care of character encoding and removing or replacing | |
152 incorrect byte-sequences and also provides column and line tracking. | |
153 | |
154 """ | |
155 | |
156 _defaultChunkSize = 10240 | |
157 | |
158 def __init__(self, source): | |
159 """Initialises the HTMLInputStream. | |
160 | |
161 HTMLInputStream(source, [encoding]) -> Normalized stream from source | |
162 for use by html5lib. | |
163 | |
164 source can be either a file-object, local filename or a string. | |
165 | |
166 The optional encoding parameter must be a string that indicates | |
167 the encoding. If specified, that encoding will be used, | |
168 regardless of any BOM or later declaration (such as in a meta | |
169 element) | |
170 | |
171 """ | |
172 | |
173 if not _utils.supports_lone_surrogates: | |
174 # Such platforms will have already checked for such | |
175 # surrogate errors, so no need to do this checking. | |
176 self.reportCharacterErrors = None | |
177 elif len("\U0010FFFF") == 1: | |
178 self.reportCharacterErrors = self.characterErrorsUCS4 | |
179 else: | |
180 self.reportCharacterErrors = self.characterErrorsUCS2 | |
181 | |
182 # List of where new lines occur | |
183 self.newLines = [0] | |
184 | |
185 self.charEncoding = (lookupEncoding("utf-8"), "certain") | |
186 self.dataStream = self.openStream(source) | |
187 | |
188 self.reset() | |
189 | |
190 def reset(self): | |
191 self.chunk = "" | |
192 self.chunkSize = 0 | |
193 self.chunkOffset = 0 | |
194 self.errors = [] | |
195 | |
196 # number of (complete) lines in previous chunks | |
197 self.prevNumLines = 0 | |
198 # number of columns in the last line of the previous chunk | |
199 self.prevNumCols = 0 | |
200 | |
201 # Deal with CR LF and surrogates split over chunk boundaries | |
202 self._bufferedCharacter = None | |
203 | |
204 def openStream(self, source): | |
205 """Produces a file object from source. | |
206 | |
207 source can be either a file object, local filename or a string. | |
208 | |
209 """ | |
210 # Already a file object | |
211 if hasattr(source, 'read'): | |
212 stream = source | |
213 else: | |
214 stream = StringIO(source) | |
215 | |
216 return stream | |
217 | |
218 def _position(self, offset): | |
219 chunk = self.chunk | |
220 nLines = chunk.count('\n', 0, offset) | |
221 positionLine = self.prevNumLines + nLines | |
222 lastLinePos = chunk.rfind('\n', 0, offset) | |
223 if lastLinePos == -1: | |
224 positionColumn = self.prevNumCols + offset | |
225 else: | |
226 positionColumn = offset - (lastLinePos + 1) | |
227 return (positionLine, positionColumn) | |
228 | |
229 def position(self): | |
230 """Returns (line, col) of the current position in the stream.""" | |
231 line, col = self._position(self.chunkOffset) | |
232 return (line + 1, col) | |
233 | |
234 def char(self): | |
235 """ Read one character from the stream or queue if available. Return | |
236 EOF when EOF is reached. | |
237 """ | |
238 # Read a new chunk from the input stream if necessary | |
239 if self.chunkOffset >= self.chunkSize: | |
240 if not self.readChunk(): | |
241 return EOF | |
242 | |
243 chunkOffset = self.chunkOffset | |
244 char = self.chunk[chunkOffset] | |
245 self.chunkOffset = chunkOffset + 1 | |
246 | |
247 return char | |
248 | |
249 def readChunk(self, chunkSize=None): | |
250 if chunkSize is None: | |
251 chunkSize = self._defaultChunkSize | |
252 | |
253 self.prevNumLines, self.prevNumCols = self._position(self.chunkSize) | |
254 | |
255 self.chunk = "" | |
256 self.chunkSize = 0 | |
257 self.chunkOffset = 0 | |
258 | |
259 data = self.dataStream.read(chunkSize) | |
260 | |
261 # Deal with CR LF and surrogates broken across chunks | |
262 if self._bufferedCharacter: | |
263 data = self._bufferedCharacter + data | |
264 self._bufferedCharacter = None | |
265 elif not data: | |
266 # We have no more data, bye-bye stream | |
267 return False | |
268 | |
269 if len(data) > 1: | |
270 lastv = ord(data[-1]) | |
271 if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF: | |
272 self._bufferedCharacter = data[-1] | |
273 data = data[:-1] | |
274 | |
275 if self.reportCharacterErrors: | |
276 self.reportCharacterErrors(data) | |
277 | |
278 # Replace invalid characters | |
279 data = data.replace("\r\n", "\n") | |
280 data = data.replace("\r", "\n") | |
281 | |
282 self.chunk = data | |
283 self.chunkSize = len(data) | |
284 | |
285 return True | |
286 | |
287 def characterErrorsUCS4(self, data): | |
288 for _ in range(len(invalid_unicode_re.findall(data))): | |
289 self.errors.append("invalid-codepoint") | |
290 | |
291 def characterErrorsUCS2(self, data): | |
292 # Someone picked the wrong compile option | |
293 # You lose | |
294 skip = False | |
295 for match in invalid_unicode_re.finditer(data): | |
296 if skip: | |
297 continue | |
298 codepoint = ord(match.group()) | |
299 pos = match.start() | |
300 # Pretty sure there should be endianness issues here | |
301 if _utils.isSurrogatePair(data[pos:pos + 2]): | |
302 # We have a surrogate pair! | |
303 char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2]) | |
304 if char_val in non_bmp_invalid_codepoints: | |
305 self.errors.append("invalid-codepoint") | |
306 skip = True | |
307 elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and | |
308 pos == len(data) - 1): | |
309 self.errors.append("invalid-codepoint") | |
310 else: | |
311 skip = False | |
312 self.errors.append("invalid-codepoint") | |
313 | |
314 def charsUntil(self, characters, opposite=False): | |
315 """ Returns a string of characters from the stream up to but not | |
316 including any character in 'characters' or EOF. 'characters' must be | |
317 a container that supports the 'in' method and iteration over its | |
318 characters. | |
319 """ | |
320 | |
321 # Use a cache of regexps to find the required characters | |
322 try: | |
323 chars = charsUntilRegEx[(characters, opposite)] | |
324 except KeyError: | |
325 if __debug__: | |
326 for c in characters: | |
327 assert(ord(c) < 128) | |
328 regex = "".join(["\\x%02x" % ord(c) for c in characters]) | |
329 if not opposite: | |
330 regex = "^%s" % regex | |
331 chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex) | |
332 | |
333 rv = [] | |
334 | |
335 while True: | |
336 # Find the longest matching prefix | |
337 m = chars.match(self.chunk, self.chunkOffset) | |
338 if m is None: | |
339 # If nothing matched, and it wasn't because we ran out of chunk, | |
340 # then stop | |
341 if self.chunkOffset != self.chunkSize: | |
342 break | |
343 else: | |
344 end = m.end() | |
345 # If not the whole chunk matched, return everything | |
346 # up to the part that didn't match | |
347 if end != self.chunkSize: | |
348 rv.append(self.chunk[self.chunkOffset:end]) | |
349 self.chunkOffset = end | |
350 break | |
351 # If the whole remainder of the chunk matched, | |
352 # use it all and read the next chunk | |
353 rv.append(self.chunk[self.chunkOffset:]) | |
354 if not self.readChunk(): | |
355 # Reached EOF | |
356 break | |
357 | |
358 r = "".join(rv) | |
359 return r | |
360 | |
361 def unget(self, char): | |
362 # Only one character is allowed to be ungotten at once - it must | |
363 # be consumed again before any further call to unget | |
364 if char is not EOF: | |
365 if self.chunkOffset == 0: | |
366 # unget is called quite rarely, so it's a good idea to do | |
367 # more work here if it saves a bit of work in the frequently | |
368 # called char and charsUntil. | |
369 # So, just prepend the ungotten character onto the current | |
370 # chunk: | |
371 self.chunk = char + self.chunk | |
372 self.chunkSize += 1 | |
373 else: | |
374 self.chunkOffset -= 1 | |
375 assert self.chunk[self.chunkOffset] == char | |
376 | |
377 | |
378 class HTMLBinaryInputStream(HTMLUnicodeInputStream): | |
379 """Provides a unicode stream of characters to the HTMLTokenizer. | |
380 | |
381 This class takes care of character encoding and removing or replacing | |
382 incorrect byte-sequences and also provides column and line tracking. | |
383 | |
384 """ | |
385 | |
386 def __init__(self, source, override_encoding=None, transport_encoding=None, | |
387 same_origin_parent_encoding=None, likely_encoding=None, | |
388 default_encoding="windows-1252", useChardet=True): | |
389 """Initialises the HTMLInputStream. | |
390 | |
391 HTMLInputStream(source, [encoding]) -> Normalized stream from source | |
392 for use by html5lib. | |
393 | |
394 source can be either a file-object, local filename or a string. | |
395 | |
396 The optional encoding parameter must be a string that indicates | |
397 the encoding. If specified, that encoding will be used, | |
398 regardless of any BOM or later declaration (such as in a meta | |
399 element) | |
400 | |
401 """ | |
402 # Raw Stream - for unicode objects this will encode to utf-8 and set | |
403 # self.charEncoding as appropriate | |
404 self.rawStream = self.openStream(source) | |
405 | |
406 HTMLUnicodeInputStream.__init__(self, self.rawStream) | |
407 | |
408 # Encoding Information | |
409 # Number of bytes to use when looking for a meta element with | |
410 # encoding information | |
411 self.numBytesMeta = 1024 | |
412 # Number of bytes to use when using detecting encoding using chardet | |
413 self.numBytesChardet = 100 | |
414 # Things from args | |
415 self.override_encoding = override_encoding | |
416 self.transport_encoding = transport_encoding | |
417 self.same_origin_parent_encoding = same_origin_parent_encoding | |
418 self.likely_encoding = likely_encoding | |
419 self.default_encoding = default_encoding | |
420 | |
421 # Determine encoding | |
422 self.charEncoding = self.determineEncoding(useChardet) | |
423 assert self.charEncoding[0] is not None | |
424 | |
425 # Call superclass | |
426 self.reset() | |
427 | |
428 def reset(self): | |
429 self.dataStream = self.charEncoding[0].codec_info.streamreader(self.rawStream, 'replace') | |
430 HTMLUnicodeInputStream.reset(self) | |
431 | |
432 def openStream(self, source): | |
433 """Produces a file object from source. | |
434 | |
435 source can be either a file object, local filename or a string. | |
436 | |
437 """ | |
438 # Already a file object | |
439 if hasattr(source, 'read'): | |
440 stream = source | |
441 else: | |
442 stream = BytesIO(source) | |
443 | |
444 try: | |
445 stream.seek(stream.tell()) | |
446 except Exception: | |
447 stream = BufferedStream(stream) | |
448 | |
449 return stream | |
450 | |
451 def determineEncoding(self, chardet=True): | |
452 # BOMs take precedence over everything | |
453 # This will also read past the BOM if present | |
454 charEncoding = self.detectBOM(), "certain" | |
455 if charEncoding[0] is not None: | |
456 return charEncoding | |
457 | |
458 # If we've been overridden, we've been overridden | |
459 charEncoding = lookupEncoding(self.override_encoding), "certain" | |
460 if charEncoding[0] is not None: | |
461 return charEncoding | |
462 | |
463 # Now check the transport layer | |
464 charEncoding = lookupEncoding(self.transport_encoding), "certain" | |
465 if charEncoding[0] is not None: | |
466 return charEncoding | |
467 | |
468 # Look for meta elements with encoding information | |
469 charEncoding = self.detectEncodingMeta(), "tentative" | |
470 if charEncoding[0] is not None: | |
471 return charEncoding | |
472 | |
473 # Parent document encoding | |
474 charEncoding = lookupEncoding(self.same_origin_parent_encoding), "tentative" | |
475 if charEncoding[0] is not None and not charEncoding[0].name.startswith("utf-16"): | |
476 return charEncoding | |
477 | |
478 # "likely" encoding | |
479 charEncoding = lookupEncoding(self.likely_encoding), "tentative" | |
480 if charEncoding[0] is not None: | |
481 return charEncoding | |
482 | |
483 # Guess with chardet, if available | |
484 if chardet: | |
485 try: | |
486 from chardet.universaldetector import UniversalDetector | |
487 except ImportError: | |
488 pass | |
489 else: | |
490 buffers = [] | |
491 detector = UniversalDetector() | |
492 while not detector.done: | |
493 buffer = self.rawStream.read(self.numBytesChardet) | |
494 assert isinstance(buffer, bytes) | |
495 if not buffer: | |
496 break | |
497 buffers.append(buffer) | |
498 detector.feed(buffer) | |
499 detector.close() | |
500 encoding = lookupEncoding(detector.result['encoding']) | |
501 self.rawStream.seek(0) | |
502 if encoding is not None: | |
503 return encoding, "tentative" | |
504 | |
505 # Try the default encoding | |
506 charEncoding = lookupEncoding(self.default_encoding), "tentative" | |
507 if charEncoding[0] is not None: | |
508 return charEncoding | |
509 | |
510 # Fallback to html5lib's default if even that hasn't worked | |
511 return lookupEncoding("windows-1252"), "tentative" | |
512 | |
513 def changeEncoding(self, newEncoding): | |
514 assert self.charEncoding[1] != "certain" | |
515 newEncoding = lookupEncoding(newEncoding) | |
516 if newEncoding is None: | |
517 return | |
518 if newEncoding.name in ("utf-16be", "utf-16le"): | |
519 newEncoding = lookupEncoding("utf-8") | |
520 assert newEncoding is not None | |
521 elif newEncoding == self.charEncoding[0]: | |
522 self.charEncoding = (self.charEncoding[0], "certain") | |
523 else: | |
524 self.rawStream.seek(0) | |
525 self.charEncoding = (newEncoding, "certain") | |
526 self.reset() | |
527 raise _ReparseException("Encoding changed from %s to %s" % (self.charEncoding[0], newEncoding)) | |
528 | |
529 def detectBOM(self): | |
530 """Attempts to detect at BOM at the start of the stream. If | |
531 an encoding can be determined from the BOM return the name of the | |
532 encoding otherwise return None""" | |
533 bomDict = { | |
534 codecs.BOM_UTF8: 'utf-8', | |
535 codecs.BOM_UTF16_LE: 'utf-16le', codecs.BOM_UTF16_BE: 'utf-16be', | |
536 codecs.BOM_UTF32_LE: 'utf-32le', codecs.BOM_UTF32_BE: 'utf-32be' | |
537 } | |
538 | |
539 # Go to beginning of file and read in 4 bytes | |
540 string = self.rawStream.read(4) | |
541 assert isinstance(string, bytes) | |
542 | |
543 # Try detecting the BOM using bytes from the string | |
544 encoding = bomDict.get(string[:3]) # UTF-8 | |
545 seek = 3 | |
546 if not encoding: | |
547 # Need to detect UTF-32 before UTF-16 | |
548 encoding = bomDict.get(string) # UTF-32 | |
549 seek = 4 | |
550 if not encoding: | |
551 encoding = bomDict.get(string[:2]) # UTF-16 | |
552 seek = 2 | |
553 | |
554 # Set the read position past the BOM if one was found, otherwise | |
555 # set it to the start of the stream | |
556 if encoding: | |
557 self.rawStream.seek(seek) | |
558 return lookupEncoding(encoding) | |
559 else: | |
560 self.rawStream.seek(0) | |
561 return None | |
562 | |
563 def detectEncodingMeta(self): | |
564 """Report the encoding declared by the meta element | |
565 """ | |
566 buffer = self.rawStream.read(self.numBytesMeta) | |
567 assert isinstance(buffer, bytes) | |
568 parser = EncodingParser(buffer) | |
569 self.rawStream.seek(0) | |
570 encoding = parser.getEncoding() | |
571 | |
572 if encoding is not None and encoding.name in ("utf-16be", "utf-16le"): | |
573 encoding = lookupEncoding("utf-8") | |
574 | |
575 return encoding | |
576 | |
577 | |
578 class EncodingBytes(bytes): | |
579 """String-like object with an associated position and various extra methods | |
580 If the position is ever greater than the string length then an exception is | |
581 raised""" | |
582 def __new__(self, value): | |
583 assert isinstance(value, bytes) | |
584 return bytes.__new__(self, value.lower()) | |
585 | |
586 def __init__(self, value): | |
587 # pylint:disable=unused-argument | |
588 self._position = -1 | |
589 | |
590 def __iter__(self): | |
591 return self | |
592 | |
593 def __next__(self): | |
594 p = self._position = self._position + 1 | |
595 if p >= len(self): | |
596 raise StopIteration | |
597 elif p < 0: | |
598 raise TypeError | |
599 return self[p:p + 1] | |
600 | |
601 def next(self): | |
602 # Py2 compat | |
603 return self.__next__() | |
604 | |
605 def previous(self): | |
606 p = self._position | |
607 if p >= len(self): | |
608 raise StopIteration | |
609 elif p < 0: | |
610 raise TypeError | |
611 self._position = p = p - 1 | |
612 return self[p:p + 1] | |
613 | |
614 def setPosition(self, position): | |
615 if self._position >= len(self): | |
616 raise StopIteration | |
617 self._position = position | |
618 | |
619 def getPosition(self): | |
620 if self._position >= len(self): | |
621 raise StopIteration | |
622 if self._position >= 0: | |
623 return self._position | |
624 else: | |
625 return None | |
626 | |
627 position = property(getPosition, setPosition) | |
628 | |
629 def getCurrentByte(self): | |
630 return self[self.position:self.position + 1] | |
631 | |
632 currentByte = property(getCurrentByte) | |
633 | |
634 def skip(self, chars=spaceCharactersBytes): | |
635 """Skip past a list of characters""" | |
636 p = self.position # use property for the error-checking | |
637 while p < len(self): | |
638 c = self[p:p + 1] | |
639 if c not in chars: | |
640 self._position = p | |
641 return c | |
642 p += 1 | |
643 self._position = p | |
644 return None | |
645 | |
646 def skipUntil(self, chars): | |
647 p = self.position | |
648 while p < len(self): | |
649 c = self[p:p + 1] | |
650 if c in chars: | |
651 self._position = p | |
652 return c | |
653 p += 1 | |
654 self._position = p | |
655 return None | |
656 | |
657 def matchBytes(self, bytes): | |
658 """Look for a sequence of bytes at the start of a string. If the bytes | |
659 are found return True and advance the position to the byte after the | |
660 match. Otherwise return False and leave the position alone""" | |
661 rv = self.startswith(bytes, self.position) | |
662 if rv: | |
663 self.position += len(bytes) | |
664 return rv | |
665 | |
666 def jumpTo(self, bytes): | |
667 """Look for the next sequence of bytes matching a given sequence. If | |
668 a match is found advance the position to the last byte of the match""" | |
669 try: | |
670 self._position = self.index(bytes, self.position) + len(bytes) - 1 | |
671 except ValueError: | |
672 raise StopIteration | |
673 return True | |
674 | |
675 | |
676 class EncodingParser(object): | |
677 """Mini parser for detecting character encoding from meta elements""" | |
678 | |
679 def __init__(self, data): | |
680 """string - the data to work on for encoding detection""" | |
681 self.data = EncodingBytes(data) | |
682 self.encoding = None | |
683 | |
684 def getEncoding(self): | |
685 if b"<meta" not in self.data: | |
686 return None | |
687 | |
688 methodDispatch = ( | |
689 (b"<!--", self.handleComment), | |
690 (b"<meta", self.handleMeta), | |
691 (b"</", self.handlePossibleEndTag), | |
692 (b"<!", self.handleOther), | |
693 (b"<?", self.handleOther), | |
694 (b"<", self.handlePossibleStartTag)) | |
695 for _ in self.data: | |
696 keepParsing = True | |
697 try: | |
698 self.data.jumpTo(b"<") | |
699 except StopIteration: | |
700 break | |
701 for key, method in methodDispatch: | |
702 if self.data.matchBytes(key): | |
703 try: | |
704 keepParsing = method() | |
705 break | |
706 except StopIteration: | |
707 keepParsing = False | |
708 break | |
709 if not keepParsing: | |
710 break | |
711 | |
712 return self.encoding | |
713 | |
714 def handleComment(self): | |
715 """Skip over comments""" | |
716 return self.data.jumpTo(b"-->") | |
717 | |
718 def handleMeta(self): | |
719 if self.data.currentByte not in spaceCharactersBytes: | |
720 # if we have <meta not followed by a space so just keep going | |
721 return True | |
722 # We have a valid meta element we want to search for attributes | |
723 hasPragma = False | |
724 pendingEncoding = None | |
725 while True: | |
726 # Try to find the next attribute after the current position | |
727 attr = self.getAttribute() | |
728 if attr is None: | |
729 return True | |
730 else: | |
731 if attr[0] == b"http-equiv": | |
732 hasPragma = attr[1] == b"content-type" | |
733 if hasPragma and pendingEncoding is not None: | |
734 self.encoding = pendingEncoding | |
735 return False | |
736 elif attr[0] == b"charset": | |
737 tentativeEncoding = attr[1] | |
738 codec = lookupEncoding(tentativeEncoding) | |
739 if codec is not None: | |
740 self.encoding = codec | |
741 return False | |
742 elif attr[0] == b"content": | |
743 contentParser = ContentAttrParser(EncodingBytes(attr[1])) | |
744 tentativeEncoding = contentParser.parse() | |
745 if tentativeEncoding is not None: | |
746 codec = lookupEncoding(tentativeEncoding) | |
747 if codec is not None: | |
748 if hasPragma: | |
749 self.encoding = codec | |
750 return False | |
751 else: | |
752 pendingEncoding = codec | |
753 | |
754 def handlePossibleStartTag(self): | |
755 return self.handlePossibleTag(False) | |
756 | |
757 def handlePossibleEndTag(self): | |
758 next(self.data) | |
759 return self.handlePossibleTag(True) | |
760 | |
761 def handlePossibleTag(self, endTag): | |
762 data = self.data | |
763 if data.currentByte not in asciiLettersBytes: | |
764 # If the next byte is not an ascii letter either ignore this | |
765 # fragment (possible start tag case) or treat it according to | |
766 # handleOther | |
767 if endTag: | |
768 data.previous() | |
769 self.handleOther() | |
770 return True | |
771 | |
772 c = data.skipUntil(spacesAngleBrackets) | |
773 if c == b"<": | |
774 # return to the first step in the overall "two step" algorithm | |
775 # reprocessing the < byte | |
776 data.previous() | |
777 else: | |
778 # Read all attributes | |
779 attr = self.getAttribute() | |
780 while attr is not None: | |
781 attr = self.getAttribute() | |
782 return True | |
783 | |
784 def handleOther(self): | |
785 return self.data.jumpTo(b">") | |
786 | |
787 def getAttribute(self): | |
788 """Return a name,value pair for the next attribute in the stream, | |
789 if one is found, or None""" | |
790 data = self.data | |
791 # Step 1 (skip chars) | |
792 c = data.skip(spaceCharactersBytes | frozenset([b"/"])) | |
793 assert c is None or len(c) == 1 | |
794 # Step 2 | |
795 if c in (b">", None): | |
796 return None | |
797 # Step 3 | |
798 attrName = [] | |
799 attrValue = [] | |
800 # Step 4 attribute name | |
801 while True: | |
802 if c == b"=" and attrName: | |
803 break | |
804 elif c in spaceCharactersBytes: | |
805 # Step 6! | |
806 c = data.skip() | |
807 break | |
808 elif c in (b"/", b">"): | |
809 return b"".join(attrName), b"" | |
810 elif c in asciiUppercaseBytes: | |
811 attrName.append(c.lower()) | |
812 elif c is None: | |
813 return None | |
814 else: | |
815 attrName.append(c) | |
816 # Step 5 | |
817 c = next(data) | |
818 # Step 7 | |
819 if c != b"=": | |
820 data.previous() | |
821 return b"".join(attrName), b"" | |
822 # Step 8 | |
823 next(data) | |
824 # Step 9 | |
825 c = data.skip() | |
826 # Step 10 | |
827 if c in (b"'", b'"'): | |
828 # 10.1 | |
829 quoteChar = c | |
830 while True: | |
831 # 10.2 | |
832 c = next(data) | |
833 # 10.3 | |
834 if c == quoteChar: | |
835 next(data) | |
836 return b"".join(attrName), b"".join(attrValue) | |
837 # 10.4 | |
838 elif c in asciiUppercaseBytes: | |
839 attrValue.append(c.lower()) | |
840 # 10.5 | |
841 else: | |
842 attrValue.append(c) | |
843 elif c == b">": | |
844 return b"".join(attrName), b"" | |
845 elif c in asciiUppercaseBytes: | |
846 attrValue.append(c.lower()) | |
847 elif c is None: | |
848 return None | |
849 else: | |
850 attrValue.append(c) | |
851 # Step 11 | |
852 while True: | |
853 c = next(data) | |
854 if c in spacesAngleBrackets: | |
855 return b"".join(attrName), b"".join(attrValue) | |
856 elif c in asciiUppercaseBytes: | |
857 attrValue.append(c.lower()) | |
858 elif c is None: | |
859 return None | |
860 else: | |
861 attrValue.append(c) | |
862 | |
863 | |
864 class ContentAttrParser(object): | |
865 def __init__(self, data): | |
866 assert isinstance(data, bytes) | |
867 self.data = data | |
868 | |
869 def parse(self): | |
870 try: | |
871 # Check if the attr name is charset | |
872 # otherwise return | |
873 self.data.jumpTo(b"charset") | |
874 self.data.position += 1 | |
875 self.data.skip() | |
876 if not self.data.currentByte == b"=": | |
877 # If there is no = sign keep looking for attrs | |
878 return None | |
879 self.data.position += 1 | |
880 self.data.skip() | |
881 # Look for an encoding between matching quote marks | |
882 if self.data.currentByte in (b'"', b"'"): | |
883 quoteMark = self.data.currentByte | |
884 self.data.position += 1 | |
885 oldPosition = self.data.position | |
886 if self.data.jumpTo(quoteMark): | |
887 return self.data[oldPosition:self.data.position] | |
888 else: | |
889 return None | |
890 else: | |
891 # Unquoted value | |
892 oldPosition = self.data.position | |
893 try: | |
894 self.data.skipUntil(spaceCharactersBytes) | |
895 return self.data[oldPosition:self.data.position] | |
896 except StopIteration: | |
897 # Return the whole remaining value | |
898 return self.data[oldPosition:] | |
899 except StopIteration: | |
900 return None | |
901 | |
902 | |
903 def lookupEncoding(encoding): | |
904 """Return the python codec name corresponding to an encoding or None if the | |
905 string doesn't correspond to a valid encoding.""" | |
906 if isinstance(encoding, bytes): | |
907 try: | |
908 encoding = encoding.decode("ascii") | |
909 except UnicodeDecodeError: | |
910 return None | |
911 | |
912 if encoding is not None: | |
913 try: | |
914 return webencodings.lookup(encoding) | |
915 except AttributeError: | |
916 return None | |
917 else: | |
918 return None |