comparison env/lib/python3.9/site-packages/boltons/ioutils.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # -*- coding: utf-8 -*-
2
3 # Coding decl above needed for rendering the emdash properly in the
4 # documentation.
5
6 """
7 Module ``ioutils`` implements a number of helper classes and functions which
8 are useful when dealing with input, output, and bytestreams in a variety of
9 ways.
10 """
11 import os
12 from io import BytesIO
13 from abc import (
14 ABCMeta,
15 abstractmethod,
16 abstractproperty,
17 )
18 from errno import EINVAL
19 from codecs import EncodedFile
20 from tempfile import TemporaryFile
21
22 try:
23 text_type = unicode # Python 2
24 binary_type = str
25 except NameError:
26 text_type = str # Python 3
27 binary_type = bytes
28
29 READ_CHUNK_SIZE = 21333
30 """
31 Number of bytes to read at a time. The value is ~ 1/3rd of 64k which means that
32 the value will easily fit in the L2 cache of most processors even if every
33 codepoint in a string is three bytes long which makes it a nice fast default
34 value.
35 """
36
37
38 class SpooledIOBase(object):
39 """
40 The SpooledTempoaryFile class doesn't support a number of attributes and
41 methods that a StringIO instance does. This brings the api as close to
42 compatible as possible with StringIO so that it may be used as a near
43 drop-in replacement to save memory.
44
45 Another issue with SpooledTemporaryFile is that the spooled file is always
46 a cStringIO rather than a StringIO which causes issues with some of our
47 tools.
48 """
49 __metaclass__ = ABCMeta
50
51 def __init__(self, max_size=5000000, dir=None):
52 self._max_size = max_size
53 self._dir = dir
54
55 @abstractmethod
56 def read(self, n=-1):
57 """Read n characters from the buffer"""
58
59 @abstractmethod
60 def write(self, s):
61 """Write into the buffer"""
62
63 @abstractmethod
64 def seek(self, pos, mode=0):
65 """Seek to a specific point in a file"""
66
67 @abstractmethod
68 def readline(self, length=None):
69 """Returns the next available line"""
70
71 @abstractmethod
72 def readlines(self, sizehint=0):
73 """Returns a list of all lines from the current position forward"""
74
75 @abstractmethod
76 def rollover(self):
77 """Roll file-like-object over into a real temporary file"""
78
79 @abstractmethod
80 def tell(self):
81 """Return the current position"""
82
83 @abstractproperty
84 def buffer(self):
85 """Should return a flo instance"""
86
87 @abstractproperty
88 def _rolled(self):
89 """Returns whether the file has been rolled to a real file or not"""
90
91 @abstractproperty
92 def len(self):
93 """Returns the length of the data"""
94
95 def _get_softspace(self):
96 return self.buffer.softspace
97
98 def _set_softspace(self, val):
99 self.buffer.softspace = val
100
101 softspace = property(_get_softspace, _set_softspace)
102
103 @property
104 def _file(self):
105 return self.buffer
106
107 def close(self):
108 return self.buffer.close()
109
110 def flush(self):
111 return self.buffer.flush()
112
113 def isatty(self):
114 return self.buffer.isatty()
115
116 def next(self):
117 line = self.readline()
118 if not line:
119 pos = self.buffer.tell()
120 self.buffer.seek(0, os.SEEK_END)
121 if pos == self.buffer.tell():
122 raise StopIteration
123 else:
124 self.buffer.seek(pos)
125 return line
126
127 @property
128 def closed(self):
129 return self.buffer.closed
130
131 @property
132 def pos(self):
133 return self.tell()
134
135 @property
136 def buf(self):
137 return self.getvalue()
138
139 def fileno(self):
140 self.rollover()
141 return self.buffer.fileno()
142
143 def truncate(self, size=None):
144 """
145 Custom version of truncate that takes either no arguments (like the
146 real SpooledTemporaryFile) or a single argument that truncates the
147 value to a certain index location.
148 """
149 if size is None:
150 return self.buffer.truncate()
151
152 if size < 0:
153 raise IOError(EINVAL, "Negative size not allowed")
154
155 # Emulate truncation to a particular location
156 pos = self.tell()
157 self.seek(size)
158 self.buffer.truncate()
159 if pos < size:
160 self.seek(pos)
161
162 def getvalue(self):
163 """Return the entire files contents"""
164 pos = self.tell()
165 self.seek(0)
166 val = self.read()
167 self.seek(pos)
168 return val
169
170 def seekable(self):
171 return True
172
173 def readable(self):
174 return True
175
176 def writable(self):
177 return True
178
179 __next__ = next
180
181 def __len__(self):
182 return self.len
183
184 def __iter__(self):
185 return self
186
187 def __enter__(self):
188 return self
189
190 def __exit__(self, *args):
191 self._file.close()
192
193 def __eq__(self, other):
194 if isinstance(other, self.__class__):
195 return self.getvalue() == other.getvalue()
196 return False
197
198 def __ne__(self, other):
199 return not self.__eq__(other)
200
201 def __bool__(self):
202 return True
203
204 __nonzero__ = __bool__
205
206
207 class SpooledBytesIO(SpooledIOBase):
208 """
209 SpooledBytesIO is a spooled file-like-object that only accepts bytes. On
210 Python 2.x this means the 'str' type; on Python 3.x this means the 'bytes'
211 type. Bytes are written in and retrieved exactly as given, but it will
212 raise TypeErrors if something other than bytes are written.
213
214 Example::
215
216 >>> from boltons import ioutils
217 >>> with ioutils.SpooledBytesIO() as f:
218 ... f.write(b"Happy IO")
219 ... _ = f.seek(0)
220 ... isinstance(f.getvalue(), ioutils.binary_type)
221 True
222 """
223
224 def read(self, n=-1):
225 return self.buffer.read(n)
226
227 def write(self, s):
228 if not isinstance(s, binary_type):
229 raise TypeError("{0} expected, got {1}".format(
230 binary_type.__name__,
231 type(s).__name__
232 ))
233
234 if self.tell() + len(s) >= self._max_size:
235 self.rollover()
236 self.buffer.write(s)
237
238 def seek(self, pos, mode=0):
239 return self.buffer.seek(pos, mode)
240
241 def readline(self, length=None):
242 if length:
243 return self.buffer.readline(length)
244 else:
245 return self.buffer.readline()
246
247 def readlines(self, sizehint=0):
248 return self.buffer.readlines(sizehint)
249
250 def rollover(self):
251 """Roll the StringIO over to a TempFile"""
252 if not self._rolled:
253 tmp = TemporaryFile(dir=self._dir)
254 pos = self.buffer.tell()
255 tmp.write(self.buffer.getvalue())
256 tmp.seek(pos)
257 self.buffer.close()
258 self._buffer = tmp
259
260 @property
261 def _rolled(self):
262 return not isinstance(self.buffer, BytesIO)
263
264 @property
265 def buffer(self):
266 try:
267 return self._buffer
268 except AttributeError:
269 self._buffer = BytesIO()
270 return self._buffer
271
272 @property
273 def len(self):
274 """Determine the length of the file"""
275 pos = self.tell()
276 if self._rolled:
277 self.seek(0)
278 val = os.fstat(self.fileno()).st_size
279 else:
280 self.seek(0, os.SEEK_END)
281 val = self.tell()
282 self.seek(pos)
283 return val
284
285 def tell(self):
286 return self.buffer.tell()
287
288
289 class SpooledStringIO(SpooledIOBase):
290 """
291 SpooledStringIO is a spooled file-like-object that only accepts unicode
292 values. On Python 2.x this means the 'unicode' type and on Python 3.x this
293 means the 'str' type. Values are accepted as unicode and then coerced into
294 utf-8 encoded bytes for storage. On retrieval, the values are returned as
295 unicode.
296
297 Example::
298
299 >>> from boltons import ioutils
300 >>> with ioutils.SpooledStringIO() as f:
301 ... f.write(u"\u2014 Hey, an emdash!")
302 ... _ = f.seek(0)
303 ... isinstance(f.read(), ioutils.text_type)
304 True
305
306 """
307 def __init__(self, *args, **kwargs):
308 self._tell = 0
309 super(SpooledStringIO, self).__init__(*args, **kwargs)
310
311 def read(self, n=-1):
312 ret = self.buffer.reader.read(n, n)
313 self._tell = self.tell() + len(ret)
314 return ret
315
316 def write(self, s):
317 if not isinstance(s, text_type):
318 raise TypeError("{0} expected, got {1}".format(
319 text_type.__name__,
320 type(s).__name__
321 ))
322 current_pos = self.tell()
323 if self.buffer.tell() + len(s.encode('utf-8')) >= self._max_size:
324 self.rollover()
325 self.buffer.write(s.encode('utf-8'))
326 self._tell = current_pos + len(s)
327
328 def _traverse_codepoints(self, current_position, n):
329 """Traverse from current position to the right n codepoints"""
330 dest = current_position + n
331 while True:
332 if current_position == dest:
333 # By chance we've landed on the right position, break
334 break
335
336 # If the read would take us past the intended position then
337 # seek only enough to cover the offset
338 if current_position + READ_CHUNK_SIZE > dest:
339 self.read(dest - current_position)
340 break
341 else:
342 ret = self.read(READ_CHUNK_SIZE)
343
344 # Increment our current position
345 current_position += READ_CHUNK_SIZE
346
347 # If we kept reading but there was nothing here, break
348 # as we are at the end of the file
349 if not ret:
350 break
351
352 return dest
353
354 def seek(self, pos, mode=0):
355 """Traverse from offset to the specified codepoint"""
356 # Seek to position from the start of the file
357 if mode == os.SEEK_SET:
358 self.buffer.seek(0)
359 self._traverse_codepoints(0, pos)
360 self._tell = pos
361 # Seek to new position relative to current position
362 elif mode == os.SEEK_CUR:
363 start_pos = self.tell()
364 self._traverse_codepoints(self.tell(), pos)
365 self._tell = start_pos + pos
366 elif mode == os.SEEK_END:
367 self.buffer.seek(0)
368 dest_position = self.len - pos
369 self._traverse_codepoints(0, dest_position)
370 self._tell = dest_position
371 else:
372 raise ValueError(
373 "Invalid whence ({0}, should be 0, 1, or 2)".format(mode)
374 )
375 return self.tell()
376
377 def readline(self, length=None):
378 ret = self.buffer.readline(length).decode('utf-8')
379 self._tell = self.tell() + len(ret)
380 return ret
381
382 def readlines(self, sizehint=0):
383 ret = [x.decode('utf-8') for x in self.buffer.readlines(sizehint)]
384 self._tell = self.tell() + sum((len(x) for x in ret))
385 return ret
386
387 @property
388 def buffer(self):
389 try:
390 return self._buffer
391 except AttributeError:
392 self._buffer = EncodedFile(BytesIO(), data_encoding='utf-8')
393 return self._buffer
394
395 @property
396 def _rolled(self):
397 return not isinstance(self.buffer.stream, BytesIO)
398
399 def rollover(self):
400 """Roll the StringIO over to a TempFile"""
401 if not self._rolled:
402 tmp = EncodedFile(TemporaryFile(dir=self._dir),
403 data_encoding='utf-8')
404 pos = self.buffer.tell()
405 tmp.write(self.buffer.getvalue())
406 tmp.seek(pos)
407 self.buffer.close()
408 self._buffer = tmp
409
410 def tell(self):
411 """Return the codepoint position"""
412 return self._tell
413
414 @property
415 def len(self):
416 """Determine the number of codepoints in the file"""
417 pos = self.buffer.tell()
418 self.buffer.seek(0)
419 total = 0
420 while True:
421 ret = self.read(READ_CHUNK_SIZE)
422 if not ret:
423 break
424 total += len(ret)
425 self.buffer.seek(pos)
426 return total
427
428
429 def is_text_fileobj(fileobj):
430 if getattr(fileobj, 'encoding', False):
431 # codecs.open and io.TextIOBase
432 return True
433 if getattr(fileobj, 'getvalue', False):
434 # StringIO.StringIO / cStringIO.StringIO / io.StringIO
435 try:
436 if isinstance(fileobj.getvalue(), type(u'')):
437 return True
438 except Exception:
439 pass
440 return False
441
442
443 class MultiFileReader(object):
444 """Takes a list of open files or file-like objects and provides an
445 interface to read from them all contiguously. Like
446 :func:`itertools.chain()`, but for reading files.
447
448 >>> mfr = MultiFileReader(BytesIO(b'ab'), BytesIO(b'cd'), BytesIO(b'e'))
449 >>> mfr.read(3).decode('ascii')
450 u'abc'
451 >>> mfr.read(3).decode('ascii')
452 u'de'
453
454 The constructor takes as many fileobjs as you hand it, and will
455 raise a TypeError on non-file-like objects. A ValueError is raised
456 when file-like objects are a mix of bytes- and text-handling
457 objects (for instance, BytesIO and StringIO).
458 """
459
460 def __init__(self, *fileobjs):
461 if not all([callable(getattr(f, 'read', None)) and
462 callable(getattr(f, 'seek', None)) for f in fileobjs]):
463 raise TypeError('MultiFileReader expected file-like objects'
464 ' with .read() and .seek()')
465 if all([is_text_fileobj(f) for f in fileobjs]):
466 # codecs.open and io.TextIOBase
467 self._joiner = u''
468 elif any([is_text_fileobj(f) for f in fileobjs]):
469 raise ValueError('All arguments to MultiFileReader must handle'
470 ' bytes OR text, not a mix')
471 else:
472 # open/file and io.BytesIO
473 self._joiner = b''
474 self._fileobjs = fileobjs
475 self._index = 0
476
477 def read(self, amt=None):
478 """Read up to the specified *amt*, seamlessly bridging across
479 files. Returns the appropriate type of string (bytes or text)
480 for the input, and returns an empty string when the files are
481 exhausted.
482 """
483 if not amt:
484 return self._joiner.join(f.read() for f in self._fileobjs)
485 parts = []
486 while amt > 0 and self._index < len(self._fileobjs):
487 parts.append(self._fileobjs[self._index].read(amt))
488 got = len(parts[-1])
489 if got < amt:
490 self._index += 1
491 amt -= got
492 return self._joiner.join(parts)
493
494 def seek(self, offset, whence=os.SEEK_SET):
495 """Enables setting position of the file cursor to a given
496 *offset*. Currently only supports ``offset=0``.
497 """
498 if whence != os.SEEK_SET:
499 raise NotImplementedError(
500 'MultiFileReader.seek() only supports os.SEEK_SET')
501 if offset != 0:
502 raise NotImplementedError(
503 'MultiFileReader only supports seeking to start at this time')
504 for f in self._fileobjs:
505 f.seek(0)