comparison env/lib/python3.9/site-packages/ruamel/yaml/emitter.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # coding: utf-8
2
3 from __future__ import absolute_import
4 from __future__ import print_function
5
6 # Emitter expects events obeying the following grammar:
7 # stream ::= STREAM-START document* STREAM-END
8 # document ::= DOCUMENT-START node DOCUMENT-END
9 # node ::= SCALAR | sequence | mapping
10 # sequence ::= SEQUENCE-START node* SEQUENCE-END
11 # mapping ::= MAPPING-START (node node)* MAPPING-END
12
13 import sys
14 from ruamel.yaml.error import YAMLError, YAMLStreamError
15 from ruamel.yaml.events import * # NOQA
16
17 # fmt: off
18 from ruamel.yaml.compat import utf8, text_type, PY2, nprint, dbg, DBG_EVENT, \
19 check_anchorname_char
20 # fmt: on
21
22 if False: # MYPY
23 from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
24 from ruamel.yaml.compat import StreamType # NOQA
25
26 __all__ = ['Emitter', 'EmitterError']
27
28
29 class EmitterError(YAMLError):
30 pass
31
32
33 class ScalarAnalysis(object):
34 def __init__(
35 self,
36 scalar,
37 empty,
38 multiline,
39 allow_flow_plain,
40 allow_block_plain,
41 allow_single_quoted,
42 allow_double_quoted,
43 allow_block,
44 ):
45 # type: (Any, Any, Any, bool, bool, bool, bool, bool) -> None
46 self.scalar = scalar
47 self.empty = empty
48 self.multiline = multiline
49 self.allow_flow_plain = allow_flow_plain
50 self.allow_block_plain = allow_block_plain
51 self.allow_single_quoted = allow_single_quoted
52 self.allow_double_quoted = allow_double_quoted
53 self.allow_block = allow_block
54
55
56 class Indents(object):
57 # replacement for the list based stack of None/int
58 def __init__(self):
59 # type: () -> None
60 self.values = [] # type: List[Tuple[int, bool]]
61
62 def append(self, val, seq):
63 # type: (Any, Any) -> None
64 self.values.append((val, seq))
65
66 def pop(self):
67 # type: () -> Any
68 return self.values.pop()[0]
69
70 def last_seq(self):
71 # type: () -> bool
72 # return the seq(uence) value for the element added before the last one
73 # in increase_indent()
74 try:
75 return self.values[-2][1]
76 except IndexError:
77 return False
78
79 def seq_flow_align(self, seq_indent, column):
80 # type: (int, int) -> int
81 # extra spaces because of dash
82 if len(self.values) < 2 or not self.values[-1][1]:
83 return 0
84 # -1 for the dash
85 base = self.values[-1][0] if self.values[-1][0] is not None else 0
86 return base + seq_indent - column - 1
87
88 def __len__(self):
89 # type: () -> int
90 return len(self.values)
91
92
93 class Emitter(object):
94 # fmt: off
95 DEFAULT_TAG_PREFIXES = {
96 u'!': u'!',
97 u'tag:yaml.org,2002:': u'!!',
98 }
99 # fmt: on
100
101 MAX_SIMPLE_KEY_LENGTH = 128
102
103 def __init__(
104 self,
105 stream,
106 canonical=None,
107 indent=None,
108 width=None,
109 allow_unicode=None,
110 line_break=None,
111 block_seq_indent=None,
112 top_level_colon_align=None,
113 prefix_colon=None,
114 brace_single_entry_mapping_in_flow_sequence=None,
115 dumper=None,
116 ):
117 # type: (StreamType, Any, Optional[int], Optional[int], Optional[bool], Any, Optional[int], Optional[bool], Any, Optional[bool], Any) -> None # NOQA
118 self.dumper = dumper
119 if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None:
120 self.dumper._emitter = self
121 self.stream = stream
122
123 # Encoding can be overriden by STREAM-START.
124 self.encoding = None # type: Optional[Text]
125 self.allow_space_break = None
126
127 # Emitter is a state machine with a stack of states to handle nested
128 # structures.
129 self.states = [] # type: List[Any]
130 self.state = self.expect_stream_start # type: Any
131
132 # Current event and the event queue.
133 self.events = [] # type: List[Any]
134 self.event = None # type: Any
135
136 # The current indentation level and the stack of previous indents.
137 self.indents = Indents()
138 self.indent = None # type: Optional[int]
139
140 # flow_context is an expanding/shrinking list consisting of '{' and '['
141 # for each unclosed flow context. If empty list that means block context
142 self.flow_context = [] # type: List[Text]
143
144 # Contexts.
145 self.root_context = False
146 self.sequence_context = False
147 self.mapping_context = False
148 self.simple_key_context = False
149
150 # Characteristics of the last emitted character:
151 # - current position.
152 # - is it a whitespace?
153 # - is it an indention character
154 # (indentation space, '-', '?', or ':')?
155 self.line = 0
156 self.column = 0
157 self.whitespace = True
158 self.indention = True
159 self.compact_seq_seq = True # dash after dash
160 self.compact_seq_map = True # key after dash
161 # self.compact_ms = False # dash after key, only when excplicit key with ?
162 self.no_newline = None # type: Optional[bool] # set if directly after `- `
163
164 # Whether the document requires an explicit document end indicator
165 self.open_ended = False
166
167 # colon handling
168 self.colon = u':'
169 self.prefixed_colon = self.colon if prefix_colon is None else prefix_colon + self.colon
170 # single entry mappings in flow sequence
171 self.brace_single_entry_mapping_in_flow_sequence = (
172 brace_single_entry_mapping_in_flow_sequence
173 ) # NOQA
174
175 # Formatting details.
176 self.canonical = canonical
177 self.allow_unicode = allow_unicode
178 # set to False to get "\Uxxxxxxxx" for non-basic unicode like emojis
179 self.unicode_supplementary = sys.maxunicode > 0xffff
180 self.sequence_dash_offset = block_seq_indent if block_seq_indent else 0
181 self.top_level_colon_align = top_level_colon_align
182 self.best_sequence_indent = 2
183 self.requested_indent = indent # specific for literal zero indent
184 if indent and 1 < indent < 10:
185 self.best_sequence_indent = indent
186 self.best_map_indent = self.best_sequence_indent
187 # if self.best_sequence_indent < self.sequence_dash_offset + 1:
188 # self.best_sequence_indent = self.sequence_dash_offset + 1
189 self.best_width = 80
190 if width and width > self.best_sequence_indent * 2:
191 self.best_width = width
192 self.best_line_break = u'\n' # type: Any
193 if line_break in [u'\r', u'\n', u'\r\n']:
194 self.best_line_break = line_break
195
196 # Tag prefixes.
197 self.tag_prefixes = None # type: Any
198
199 # Prepared anchor and tag.
200 self.prepared_anchor = None # type: Any
201 self.prepared_tag = None # type: Any
202
203 # Scalar analysis and style.
204 self.analysis = None # type: Any
205 self.style = None # type: Any
206
207 self.scalar_after_indicator = True # write a scalar on the same line as `---`
208
209 @property
210 def stream(self):
211 # type: () -> Any
212 try:
213 return self._stream
214 except AttributeError:
215 raise YAMLStreamError('output stream needs to specified')
216
217 @stream.setter
218 def stream(self, val):
219 # type: (Any) -> None
220 if val is None:
221 return
222 if not hasattr(val, 'write'):
223 raise YAMLStreamError('stream argument needs to have a write() method')
224 self._stream = val
225
226 @property
227 def serializer(self):
228 # type: () -> Any
229 try:
230 if hasattr(self.dumper, 'typ'):
231 return self.dumper.serializer
232 return self.dumper._serializer
233 except AttributeError:
234 return self # cyaml
235
236 @property
237 def flow_level(self):
238 # type: () -> int
239 return len(self.flow_context)
240
241 def dispose(self):
242 # type: () -> None
243 # Reset the state attributes (to clear self-references)
244 self.states = []
245 self.state = None
246
247 def emit(self, event):
248 # type: (Any) -> None
249 if dbg(DBG_EVENT):
250 nprint(event)
251 self.events.append(event)
252 while not self.need_more_events():
253 self.event = self.events.pop(0)
254 self.state()
255 self.event = None
256
257 # In some cases, we wait for a few next events before emitting.
258
259 def need_more_events(self):
260 # type: () -> bool
261 if not self.events:
262 return True
263 event = self.events[0]
264 if isinstance(event, DocumentStartEvent):
265 return self.need_events(1)
266 elif isinstance(event, SequenceStartEvent):
267 return self.need_events(2)
268 elif isinstance(event, MappingStartEvent):
269 return self.need_events(3)
270 else:
271 return False
272
273 def need_events(self, count):
274 # type: (int) -> bool
275 level = 0
276 for event in self.events[1:]:
277 if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
278 level += 1
279 elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
280 level -= 1
281 elif isinstance(event, StreamEndEvent):
282 level = -1
283 if level < 0:
284 return False
285 return len(self.events) < count + 1
286
287 def increase_indent(self, flow=False, sequence=None, indentless=False):
288 # type: (bool, Optional[bool], bool) -> None
289 self.indents.append(self.indent, sequence)
290 if self.indent is None: # top level
291 if flow:
292 # self.indent = self.best_sequence_indent if self.indents.last_seq() else \
293 # self.best_map_indent
294 # self.indent = self.best_sequence_indent
295 self.indent = self.requested_indent
296 else:
297 self.indent = 0
298 elif not indentless:
299 self.indent += (
300 self.best_sequence_indent if self.indents.last_seq() else self.best_map_indent
301 )
302 # if self.indents.last_seq():
303 # if self.indent == 0: # top level block sequence
304 # self.indent = self.best_sequence_indent - self.sequence_dash_offset
305 # else:
306 # self.indent += self.best_sequence_indent
307 # else:
308 # self.indent += self.best_map_indent
309
310 # States.
311
312 # Stream handlers.
313
314 def expect_stream_start(self):
315 # type: () -> None
316 if isinstance(self.event, StreamStartEvent):
317 if PY2:
318 if self.event.encoding and not getattr(self.stream, 'encoding', None):
319 self.encoding = self.event.encoding
320 else:
321 if self.event.encoding and not hasattr(self.stream, 'encoding'):
322 self.encoding = self.event.encoding
323 self.write_stream_start()
324 self.state = self.expect_first_document_start
325 else:
326 raise EmitterError('expected StreamStartEvent, but got %s' % (self.event,))
327
328 def expect_nothing(self):
329 # type: () -> None
330 raise EmitterError('expected nothing, but got %s' % (self.event,))
331
332 # Document handlers.
333
334 def expect_first_document_start(self):
335 # type: () -> Any
336 return self.expect_document_start(first=True)
337
338 def expect_document_start(self, first=False):
339 # type: (bool) -> None
340 if isinstance(self.event, DocumentStartEvent):
341 if (self.event.version or self.event.tags) and self.open_ended:
342 self.write_indicator(u'...', True)
343 self.write_indent()
344 if self.event.version:
345 version_text = self.prepare_version(self.event.version)
346 self.write_version_directive(version_text)
347 self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
348 if self.event.tags:
349 handles = sorted(self.event.tags.keys())
350 for handle in handles:
351 prefix = self.event.tags[handle]
352 self.tag_prefixes[prefix] = handle
353 handle_text = self.prepare_tag_handle(handle)
354 prefix_text = self.prepare_tag_prefix(prefix)
355 self.write_tag_directive(handle_text, prefix_text)
356 implicit = (
357 first
358 and not self.event.explicit
359 and not self.canonical
360 and not self.event.version
361 and not self.event.tags
362 and not self.check_empty_document()
363 )
364 if not implicit:
365 self.write_indent()
366 self.write_indicator(u'---', True)
367 if self.canonical:
368 self.write_indent()
369 self.state = self.expect_document_root
370 elif isinstance(self.event, StreamEndEvent):
371 if self.open_ended:
372 self.write_indicator(u'...', True)
373 self.write_indent()
374 self.write_stream_end()
375 self.state = self.expect_nothing
376 else:
377 raise EmitterError('expected DocumentStartEvent, but got %s' % (self.event,))
378
379 def expect_document_end(self):
380 # type: () -> None
381 if isinstance(self.event, DocumentEndEvent):
382 self.write_indent()
383 if self.event.explicit:
384 self.write_indicator(u'...', True)
385 self.write_indent()
386 self.flush_stream()
387 self.state = self.expect_document_start
388 else:
389 raise EmitterError('expected DocumentEndEvent, but got %s' % (self.event,))
390
391 def expect_document_root(self):
392 # type: () -> None
393 self.states.append(self.expect_document_end)
394 self.expect_node(root=True)
395
396 # Node handlers.
397
398 def expect_node(self, root=False, sequence=False, mapping=False, simple_key=False):
399 # type: (bool, bool, bool, bool) -> None
400 self.root_context = root
401 self.sequence_context = sequence # not used in PyYAML
402 self.mapping_context = mapping
403 self.simple_key_context = simple_key
404 if isinstance(self.event, AliasEvent):
405 self.expect_alias()
406 elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
407 if (
408 self.process_anchor(u'&')
409 and isinstance(self.event, ScalarEvent)
410 and self.sequence_context
411 ):
412 self.sequence_context = False
413 if (
414 root
415 and isinstance(self.event, ScalarEvent)
416 and not self.scalar_after_indicator
417 ):
418 self.write_indent()
419 self.process_tag()
420 if isinstance(self.event, ScalarEvent):
421 # nprint('@', self.indention, self.no_newline, self.column)
422 self.expect_scalar()
423 elif isinstance(self.event, SequenceStartEvent):
424 # nprint('@', self.indention, self.no_newline, self.column)
425 i2, n2 = self.indention, self.no_newline # NOQA
426 if self.event.comment:
427 if self.event.flow_style is False and self.event.comment:
428 if self.write_post_comment(self.event):
429 self.indention = False
430 self.no_newline = True
431 if self.write_pre_comment(self.event):
432 self.indention = i2
433 self.no_newline = not self.indention
434 if (
435 self.flow_level
436 or self.canonical
437 or self.event.flow_style
438 or self.check_empty_sequence()
439 ):
440 self.expect_flow_sequence()
441 else:
442 self.expect_block_sequence()
443 elif isinstance(self.event, MappingStartEvent):
444 if self.event.flow_style is False and self.event.comment:
445 self.write_post_comment(self.event)
446 if self.event.comment and self.event.comment[1]:
447 self.write_pre_comment(self.event)
448 if (
449 self.flow_level
450 or self.canonical
451 or self.event.flow_style
452 or self.check_empty_mapping()
453 ):
454 self.expect_flow_mapping(single=self.event.nr_items == 1)
455 else:
456 self.expect_block_mapping()
457 else:
458 raise EmitterError('expected NodeEvent, but got %s' % (self.event,))
459
460 def expect_alias(self):
461 # type: () -> None
462 if self.event.anchor is None:
463 raise EmitterError('anchor is not specified for alias')
464 self.process_anchor(u'*')
465 self.state = self.states.pop()
466
467 def expect_scalar(self):
468 # type: () -> None
469 self.increase_indent(flow=True)
470 self.process_scalar()
471 self.indent = self.indents.pop()
472 self.state = self.states.pop()
473
474 # Flow sequence handlers.
475
476 def expect_flow_sequence(self):
477 # type: () -> None
478 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column)
479 self.write_indicator(u' ' * ind + u'[', True, whitespace=True)
480 self.increase_indent(flow=True, sequence=True)
481 self.flow_context.append('[')
482 self.state = self.expect_first_flow_sequence_item
483
484 def expect_first_flow_sequence_item(self):
485 # type: () -> None
486 if isinstance(self.event, SequenceEndEvent):
487 self.indent = self.indents.pop()
488 popped = self.flow_context.pop()
489 assert popped == '['
490 self.write_indicator(u']', False)
491 if self.event.comment and self.event.comment[0]:
492 # eol comment on empty flow sequence
493 self.write_post_comment(self.event)
494 elif self.flow_level == 0:
495 self.write_line_break()
496 self.state = self.states.pop()
497 else:
498 if self.canonical or self.column > self.best_width:
499 self.write_indent()
500 self.states.append(self.expect_flow_sequence_item)
501 self.expect_node(sequence=True)
502
503 def expect_flow_sequence_item(self):
504 # type: () -> None
505 if isinstance(self.event, SequenceEndEvent):
506 self.indent = self.indents.pop()
507 popped = self.flow_context.pop()
508 assert popped == '['
509 if self.canonical:
510 self.write_indicator(u',', False)
511 self.write_indent()
512 self.write_indicator(u']', False)
513 if self.event.comment and self.event.comment[0]:
514 # eol comment on flow sequence
515 self.write_post_comment(self.event)
516 else:
517 self.no_newline = False
518 self.state = self.states.pop()
519 else:
520 self.write_indicator(u',', False)
521 if self.canonical or self.column > self.best_width:
522 self.write_indent()
523 self.states.append(self.expect_flow_sequence_item)
524 self.expect_node(sequence=True)
525
526 # Flow mapping handlers.
527
528 def expect_flow_mapping(self, single=False):
529 # type: (Optional[bool]) -> None
530 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column)
531 map_init = u'{'
532 if (
533 single
534 and self.flow_level
535 and self.flow_context[-1] == '['
536 and not self.canonical
537 and not self.brace_single_entry_mapping_in_flow_sequence
538 ):
539 # single map item with flow context, no curly braces necessary
540 map_init = u''
541 self.write_indicator(u' ' * ind + map_init, True, whitespace=True)
542 self.flow_context.append(map_init)
543 self.increase_indent(flow=True, sequence=False)
544 self.state = self.expect_first_flow_mapping_key
545
546 def expect_first_flow_mapping_key(self):
547 # type: () -> None
548 if isinstance(self.event, MappingEndEvent):
549 self.indent = self.indents.pop()
550 popped = self.flow_context.pop()
551 assert popped == '{' # empty flow mapping
552 self.write_indicator(u'}', False)
553 if self.event.comment and self.event.comment[0]:
554 # eol comment on empty mapping
555 self.write_post_comment(self.event)
556 elif self.flow_level == 0:
557 self.write_line_break()
558 self.state = self.states.pop()
559 else:
560 if self.canonical or self.column > self.best_width:
561 self.write_indent()
562 if not self.canonical and self.check_simple_key():
563 self.states.append(self.expect_flow_mapping_simple_value)
564 self.expect_node(mapping=True, simple_key=True)
565 else:
566 self.write_indicator(u'?', True)
567 self.states.append(self.expect_flow_mapping_value)
568 self.expect_node(mapping=True)
569
570 def expect_flow_mapping_key(self):
571 # type: () -> None
572 if isinstance(self.event, MappingEndEvent):
573 # if self.event.comment and self.event.comment[1]:
574 # self.write_pre_comment(self.event)
575 self.indent = self.indents.pop()
576 popped = self.flow_context.pop()
577 assert popped in [u'{', u'']
578 if self.canonical:
579 self.write_indicator(u',', False)
580 self.write_indent()
581 if popped != u'':
582 self.write_indicator(u'}', False)
583 if self.event.comment and self.event.comment[0]:
584 # eol comment on flow mapping, never reached on empty mappings
585 self.write_post_comment(self.event)
586 else:
587 self.no_newline = False
588 self.state = self.states.pop()
589 else:
590 self.write_indicator(u',', False)
591 if self.canonical or self.column > self.best_width:
592 self.write_indent()
593 if not self.canonical and self.check_simple_key():
594 self.states.append(self.expect_flow_mapping_simple_value)
595 self.expect_node(mapping=True, simple_key=True)
596 else:
597 self.write_indicator(u'?', True)
598 self.states.append(self.expect_flow_mapping_value)
599 self.expect_node(mapping=True)
600
601 def expect_flow_mapping_simple_value(self):
602 # type: () -> None
603 self.write_indicator(self.prefixed_colon, False)
604 self.states.append(self.expect_flow_mapping_key)
605 self.expect_node(mapping=True)
606
607 def expect_flow_mapping_value(self):
608 # type: () -> None
609 if self.canonical or self.column > self.best_width:
610 self.write_indent()
611 self.write_indicator(self.prefixed_colon, True)
612 self.states.append(self.expect_flow_mapping_key)
613 self.expect_node(mapping=True)
614
615 # Block sequence handlers.
616
617 def expect_block_sequence(self):
618 # type: () -> None
619 if self.mapping_context:
620 indentless = not self.indention
621 else:
622 indentless = False
623 if not self.compact_seq_seq and self.column != 0:
624 self.write_line_break()
625 self.increase_indent(flow=False, sequence=True, indentless=indentless)
626 self.state = self.expect_first_block_sequence_item
627
628 def expect_first_block_sequence_item(self):
629 # type: () -> Any
630 return self.expect_block_sequence_item(first=True)
631
632 def expect_block_sequence_item(self, first=False):
633 # type: (bool) -> None
634 if not first and isinstance(self.event, SequenceEndEvent):
635 if self.event.comment and self.event.comment[1]:
636 # final comments on a block list e.g. empty line
637 self.write_pre_comment(self.event)
638 self.indent = self.indents.pop()
639 self.state = self.states.pop()
640 self.no_newline = False
641 else:
642 if self.event.comment and self.event.comment[1]:
643 self.write_pre_comment(self.event)
644 nonl = self.no_newline if self.column == 0 else False
645 self.write_indent()
646 ind = self.sequence_dash_offset # if len(self.indents) > 1 else 0
647 self.write_indicator(u' ' * ind + u'-', True, indention=True)
648 if nonl or self.sequence_dash_offset + 2 > self.best_sequence_indent:
649 self.no_newline = True
650 self.states.append(self.expect_block_sequence_item)
651 self.expect_node(sequence=True)
652
653 # Block mapping handlers.
654
655 def expect_block_mapping(self):
656 # type: () -> None
657 if not self.mapping_context and not (self.compact_seq_map or self.column == 0):
658 self.write_line_break()
659 self.increase_indent(flow=False, sequence=False)
660 self.state = self.expect_first_block_mapping_key
661
662 def expect_first_block_mapping_key(self):
663 # type: () -> None
664 return self.expect_block_mapping_key(first=True)
665
666 def expect_block_mapping_key(self, first=False):
667 # type: (Any) -> None
668 if not first and isinstance(self.event, MappingEndEvent):
669 if self.event.comment and self.event.comment[1]:
670 # final comments from a doc
671 self.write_pre_comment(self.event)
672 self.indent = self.indents.pop()
673 self.state = self.states.pop()
674 else:
675 if self.event.comment and self.event.comment[1]:
676 # final comments from a doc
677 self.write_pre_comment(self.event)
678 self.write_indent()
679 if self.check_simple_key():
680 if not isinstance(
681 self.event, (SequenceStartEvent, MappingStartEvent)
682 ): # sequence keys
683 try:
684 if self.event.style == '?':
685 self.write_indicator(u'?', True, indention=True)
686 except AttributeError: # aliases have no style
687 pass
688 self.states.append(self.expect_block_mapping_simple_value)
689 self.expect_node(mapping=True, simple_key=True)
690 if isinstance(self.event, AliasEvent):
691 self.stream.write(u' ')
692 else:
693 self.write_indicator(u'?', True, indention=True)
694 self.states.append(self.expect_block_mapping_value)
695 self.expect_node(mapping=True)
696
697 def expect_block_mapping_simple_value(self):
698 # type: () -> None
699 if getattr(self.event, 'style', None) != '?':
700 # prefix = u''
701 if self.indent == 0 and self.top_level_colon_align is not None:
702 # write non-prefixed colon
703 c = u' ' * (self.top_level_colon_align - self.column) + self.colon
704 else:
705 c = self.prefixed_colon
706 self.write_indicator(c, False)
707 self.states.append(self.expect_block_mapping_key)
708 self.expect_node(mapping=True)
709
710 def expect_block_mapping_value(self):
711 # type: () -> None
712 self.write_indent()
713 self.write_indicator(self.prefixed_colon, True, indention=True)
714 self.states.append(self.expect_block_mapping_key)
715 self.expect_node(mapping=True)
716
717 # Checkers.
718
719 def check_empty_sequence(self):
720 # type: () -> bool
721 return (
722 isinstance(self.event, SequenceStartEvent)
723 and bool(self.events)
724 and isinstance(self.events[0], SequenceEndEvent)
725 )
726
727 def check_empty_mapping(self):
728 # type: () -> bool
729 return (
730 isinstance(self.event, MappingStartEvent)
731 and bool(self.events)
732 and isinstance(self.events[0], MappingEndEvent)
733 )
734
735 def check_empty_document(self):
736 # type: () -> bool
737 if not isinstance(self.event, DocumentStartEvent) or not self.events:
738 return False
739 event = self.events[0]
740 return (
741 isinstance(event, ScalarEvent)
742 and event.anchor is None
743 and event.tag is None
744 and event.implicit
745 and event.value == ""
746 )
747
748 def check_simple_key(self):
749 # type: () -> bool
750 length = 0
751 if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
752 if self.prepared_anchor is None:
753 self.prepared_anchor = self.prepare_anchor(self.event.anchor)
754 length += len(self.prepared_anchor)
755 if (
756 isinstance(self.event, (ScalarEvent, CollectionStartEvent))
757 and self.event.tag is not None
758 ):
759 if self.prepared_tag is None:
760 self.prepared_tag = self.prepare_tag(self.event.tag)
761 length += len(self.prepared_tag)
762 if isinstance(self.event, ScalarEvent):
763 if self.analysis is None:
764 self.analysis = self.analyze_scalar(self.event.value)
765 length += len(self.analysis.scalar)
766 return length < self.MAX_SIMPLE_KEY_LENGTH and (
767 isinstance(self.event, AliasEvent)
768 or (isinstance(self.event, SequenceStartEvent) and self.event.flow_style is True)
769 or (isinstance(self.event, MappingStartEvent) and self.event.flow_style is True)
770 or (
771 isinstance(self.event, ScalarEvent)
772 and not self.analysis.empty
773 and not self.analysis.multiline
774 )
775 or self.check_empty_sequence()
776 or self.check_empty_mapping()
777 )
778
779 # Anchor, Tag, and Scalar processors.
780
781 def process_anchor(self, indicator):
782 # type: (Any) -> bool
783 if self.event.anchor is None:
784 self.prepared_anchor = None
785 return False
786 if self.prepared_anchor is None:
787 self.prepared_anchor = self.prepare_anchor(self.event.anchor)
788 if self.prepared_anchor:
789 self.write_indicator(indicator + self.prepared_anchor, True)
790 # issue 288
791 self.no_newline = False
792 self.prepared_anchor = None
793 return True
794
795 def process_tag(self):
796 # type: () -> None
797 tag = self.event.tag
798 if isinstance(self.event, ScalarEvent):
799 if self.style is None:
800 self.style = self.choose_scalar_style()
801 if (not self.canonical or tag is None) and (
802 (self.style == "" and self.event.implicit[0])
803 or (self.style != "" and self.event.implicit[1])
804 ):
805 self.prepared_tag = None
806 return
807 if self.event.implicit[0] and tag is None:
808 tag = u'!'
809 self.prepared_tag = None
810 else:
811 if (not self.canonical or tag is None) and self.event.implicit:
812 self.prepared_tag = None
813 return
814 if tag is None:
815 raise EmitterError('tag is not specified')
816 if self.prepared_tag is None:
817 self.prepared_tag = self.prepare_tag(tag)
818 if self.prepared_tag:
819 self.write_indicator(self.prepared_tag, True)
820 if (
821 self.sequence_context
822 and not self.flow_level
823 and isinstance(self.event, ScalarEvent)
824 ):
825 self.no_newline = True
826 self.prepared_tag = None
827
828 def choose_scalar_style(self):
829 # type: () -> Any
830 if self.analysis is None:
831 self.analysis = self.analyze_scalar(self.event.value)
832 if self.event.style == '"' or self.canonical:
833 return '"'
834 if (not self.event.style or self.event.style == '?') and (
835 self.event.implicit[0] or not self.event.implicit[2]
836 ):
837 if not (
838 self.simple_key_context and (self.analysis.empty or self.analysis.multiline)
839 ) and (
840 self.flow_level
841 and self.analysis.allow_flow_plain
842 or (not self.flow_level and self.analysis.allow_block_plain)
843 ):
844 return ""
845 self.analysis.allow_block = True
846 if self.event.style and self.event.style in '|>':
847 if (
848 not self.flow_level
849 and not self.simple_key_context
850 and self.analysis.allow_block
851 ):
852 return self.event.style
853 if not self.event.style and self.analysis.allow_double_quoted:
854 if "'" in self.event.value or '\n' in self.event.value:
855 return '"'
856 if not self.event.style or self.event.style == "'":
857 if self.analysis.allow_single_quoted and not (
858 self.simple_key_context and self.analysis.multiline
859 ):
860 return "'"
861 return '"'
862
863 def process_scalar(self):
864 # type: () -> None
865 if self.analysis is None:
866 self.analysis = self.analyze_scalar(self.event.value)
867 if self.style is None:
868 self.style = self.choose_scalar_style()
869 split = not self.simple_key_context
870 # if self.analysis.multiline and split \
871 # and (not self.style or self.style in '\'\"'):
872 # self.write_indent()
873 # nprint('xx', self.sequence_context, self.flow_level)
874 if self.sequence_context and not self.flow_level:
875 self.write_indent()
876 if self.style == '"':
877 self.write_double_quoted(self.analysis.scalar, split)
878 elif self.style == "'":
879 self.write_single_quoted(self.analysis.scalar, split)
880 elif self.style == '>':
881 self.write_folded(self.analysis.scalar)
882 elif self.style == '|':
883 self.write_literal(self.analysis.scalar, self.event.comment)
884 else:
885 self.write_plain(self.analysis.scalar, split)
886 self.analysis = None
887 self.style = None
888 if self.event.comment:
889 self.write_post_comment(self.event)
890
891 # Analyzers.
892
893 def prepare_version(self, version):
894 # type: (Any) -> Any
895 major, minor = version
896 if major != 1:
897 raise EmitterError('unsupported YAML version: %d.%d' % (major, minor))
898 return u'%d.%d' % (major, minor)
899
900 def prepare_tag_handle(self, handle):
901 # type: (Any) -> Any
902 if not handle:
903 raise EmitterError('tag handle must not be empty')
904 if handle[0] != u'!' or handle[-1] != u'!':
905 raise EmitterError("tag handle must start and end with '!': %r" % (utf8(handle)))
906 for ch in handle[1:-1]:
907 if not (
908 u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' or ch in u'-_'
909 ):
910 raise EmitterError(
911 'invalid character %r in the tag handle: %r' % (utf8(ch), utf8(handle))
912 )
913 return handle
914
915 def prepare_tag_prefix(self, prefix):
916 # type: (Any) -> Any
917 if not prefix:
918 raise EmitterError('tag prefix must not be empty')
919 chunks = [] # type: List[Any]
920 start = end = 0
921 if prefix[0] == u'!':
922 end = 1
923 ch_set = u"-;/?:@&=+$,_.~*'()[]"
924 if self.dumper:
925 version = getattr(self.dumper, 'version', (1, 2))
926 if version is None or version >= (1, 2):
927 ch_set += u'#'
928 while end < len(prefix):
929 ch = prefix[end]
930 if u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' or ch in ch_set:
931 end += 1
932 else:
933 if start < end:
934 chunks.append(prefix[start:end])
935 start = end = end + 1
936 data = utf8(ch)
937 for ch in data:
938 chunks.append(u'%%%02X' % ord(ch))
939 if start < end:
940 chunks.append(prefix[start:end])
941 return "".join(chunks)
942
943 def prepare_tag(self, tag):
944 # type: (Any) -> Any
945 if not tag:
946 raise EmitterError('tag must not be empty')
947 if tag == u'!':
948 return tag
949 handle = None
950 suffix = tag
951 prefixes = sorted(self.tag_prefixes.keys())
952 for prefix in prefixes:
953 if tag.startswith(prefix) and (prefix == u'!' or len(prefix) < len(tag)):
954 handle = self.tag_prefixes[prefix]
955 suffix = tag[len(prefix) :]
956 chunks = [] # type: List[Any]
957 start = end = 0
958 ch_set = u"-;/?:@&=+$,_.~*'()[]"
959 if self.dumper:
960 version = getattr(self.dumper, 'version', (1, 2))
961 if version is None or version >= (1, 2):
962 ch_set += u'#'
963 while end < len(suffix):
964 ch = suffix[end]
965 if (
966 u'0' <= ch <= u'9'
967 or u'A' <= ch <= u'Z'
968 or u'a' <= ch <= u'z'
969 or ch in ch_set
970 or (ch == u'!' and handle != u'!')
971 ):
972 end += 1
973 else:
974 if start < end:
975 chunks.append(suffix[start:end])
976 start = end = end + 1
977 data = utf8(ch)
978 for ch in data:
979 chunks.append(u'%%%02X' % ord(ch))
980 if start < end:
981 chunks.append(suffix[start:end])
982 suffix_text = "".join(chunks)
983 if handle:
984 return u'%s%s' % (handle, suffix_text)
985 else:
986 return u'!<%s>' % suffix_text
987
988 def prepare_anchor(self, anchor):
989 # type: (Any) -> Any
990 if not anchor:
991 raise EmitterError('anchor must not be empty')
992 for ch in anchor:
993 if not check_anchorname_char(ch):
994 raise EmitterError(
995 'invalid character %r in the anchor: %r' % (utf8(ch), utf8(anchor))
996 )
997 return anchor
998
999 def analyze_scalar(self, scalar):
1000 # type: (Any) -> Any
1001 # Empty scalar is a special case.
1002 if not scalar:
1003 return ScalarAnalysis(
1004 scalar=scalar,
1005 empty=True,
1006 multiline=False,
1007 allow_flow_plain=False,
1008 allow_block_plain=True,
1009 allow_single_quoted=True,
1010 allow_double_quoted=True,
1011 allow_block=False,
1012 )
1013
1014 # Indicators and special characters.
1015 block_indicators = False
1016 flow_indicators = False
1017 line_breaks = False
1018 special_characters = False
1019
1020 # Important whitespace combinations.
1021 leading_space = False
1022 leading_break = False
1023 trailing_space = False
1024 trailing_break = False
1025 break_space = False
1026 space_break = False
1027
1028 # Check document indicators.
1029 if scalar.startswith(u'---') or scalar.startswith(u'...'):
1030 block_indicators = True
1031 flow_indicators = True
1032
1033 # First character or preceded by a whitespace.
1034 preceeded_by_whitespace = True
1035
1036 # Last character or followed by a whitespace.
1037 followed_by_whitespace = len(scalar) == 1 or scalar[1] in u'\0 \t\r\n\x85\u2028\u2029'
1038
1039 # The previous character is a space.
1040 previous_space = False
1041
1042 # The previous character is a break.
1043 previous_break = False
1044
1045 index = 0
1046 while index < len(scalar):
1047 ch = scalar[index]
1048
1049 # Check for indicators.
1050 if index == 0:
1051 # Leading indicators are special characters.
1052 if ch in u'#,[]{}&*!|>\'"%@`':
1053 flow_indicators = True
1054 block_indicators = True
1055 if ch in u'?:': # ToDo
1056 if self.serializer.use_version == (1, 1):
1057 flow_indicators = True
1058 elif len(scalar) == 1: # single character
1059 flow_indicators = True
1060 if followed_by_whitespace:
1061 block_indicators = True
1062 if ch == u'-' and followed_by_whitespace:
1063 flow_indicators = True
1064 block_indicators = True
1065 else:
1066 # Some indicators cannot appear within a scalar as well.
1067 if ch in u',[]{}': # http://yaml.org/spec/1.2/spec.html#id2788859
1068 flow_indicators = True
1069 if ch == u'?' and self.serializer.use_version == (1, 1):
1070 flow_indicators = True
1071 if ch == u':':
1072 if followed_by_whitespace:
1073 flow_indicators = True
1074 block_indicators = True
1075 if ch == u'#' and preceeded_by_whitespace:
1076 flow_indicators = True
1077 block_indicators = True
1078
1079 # Check for line breaks, special, and unicode characters.
1080 if ch in u'\n\x85\u2028\u2029':
1081 line_breaks = True
1082 if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'):
1083 if (
1084 ch == u'\x85'
1085 or u'\xA0' <= ch <= u'\uD7FF'
1086 or u'\uE000' <= ch <= u'\uFFFD'
1087 or (self.unicode_supplementary and (u'\U00010000' <= ch <= u'\U0010FFFF'))
1088 ) and ch != u'\uFEFF':
1089 # unicode_characters = True
1090 if not self.allow_unicode:
1091 special_characters = True
1092 else:
1093 special_characters = True
1094
1095 # Detect important whitespace combinations.
1096 if ch == u' ':
1097 if index == 0:
1098 leading_space = True
1099 if index == len(scalar) - 1:
1100 trailing_space = True
1101 if previous_break:
1102 break_space = True
1103 previous_space = True
1104 previous_break = False
1105 elif ch in u'\n\x85\u2028\u2029':
1106 if index == 0:
1107 leading_break = True
1108 if index == len(scalar) - 1:
1109 trailing_break = True
1110 if previous_space:
1111 space_break = True
1112 previous_space = False
1113 previous_break = True
1114 else:
1115 previous_space = False
1116 previous_break = False
1117
1118 # Prepare for the next character.
1119 index += 1
1120 preceeded_by_whitespace = ch in u'\0 \t\r\n\x85\u2028\u2029'
1121 followed_by_whitespace = (
1122 index + 1 >= len(scalar) or scalar[index + 1] in u'\0 \t\r\n\x85\u2028\u2029'
1123 )
1124
1125 # Let's decide what styles are allowed.
1126 allow_flow_plain = True
1127 allow_block_plain = True
1128 allow_single_quoted = True
1129 allow_double_quoted = True
1130 allow_block = True
1131
1132 # Leading and trailing whitespaces are bad for plain scalars.
1133 if leading_space or leading_break or trailing_space or trailing_break:
1134 allow_flow_plain = allow_block_plain = False
1135
1136 # We do not permit trailing spaces for block scalars.
1137 if trailing_space:
1138 allow_block = False
1139
1140 # Spaces at the beginning of a new line are only acceptable for block
1141 # scalars.
1142 if break_space:
1143 allow_flow_plain = allow_block_plain = allow_single_quoted = False
1144
1145 # Spaces followed by breaks, as well as special character are only
1146 # allowed for double quoted scalars.
1147 if special_characters:
1148 allow_flow_plain = allow_block_plain = allow_single_quoted = allow_block = False
1149 elif space_break:
1150 allow_flow_plain = allow_block_plain = allow_single_quoted = False
1151 if not self.allow_space_break:
1152 allow_block = False
1153
1154 # Although the plain scalar writer supports breaks, we never emit
1155 # multiline plain scalars.
1156 if line_breaks:
1157 allow_flow_plain = allow_block_plain = False
1158
1159 # Flow indicators are forbidden for flow plain scalars.
1160 if flow_indicators:
1161 allow_flow_plain = False
1162
1163 # Block indicators are forbidden for block plain scalars.
1164 if block_indicators:
1165 allow_block_plain = False
1166
1167 return ScalarAnalysis(
1168 scalar=scalar,
1169 empty=False,
1170 multiline=line_breaks,
1171 allow_flow_plain=allow_flow_plain,
1172 allow_block_plain=allow_block_plain,
1173 allow_single_quoted=allow_single_quoted,
1174 allow_double_quoted=allow_double_quoted,
1175 allow_block=allow_block,
1176 )
1177
1178 # Writers.
1179
1180 def flush_stream(self):
1181 # type: () -> None
1182 if hasattr(self.stream, 'flush'):
1183 self.stream.flush()
1184
1185 def write_stream_start(self):
1186 # type: () -> None
1187 # Write BOM if needed.
1188 if self.encoding and self.encoding.startswith('utf-16'):
1189 self.stream.write(u'\uFEFF'.encode(self.encoding))
1190
1191 def write_stream_end(self):
1192 # type: () -> None
1193 self.flush_stream()
1194
1195 def write_indicator(self, indicator, need_whitespace, whitespace=False, indention=False):
1196 # type: (Any, Any, bool, bool) -> None
1197 if self.whitespace or not need_whitespace:
1198 data = indicator
1199 else:
1200 data = u' ' + indicator
1201 self.whitespace = whitespace
1202 self.indention = self.indention and indention
1203 self.column += len(data)
1204 self.open_ended = False
1205 if bool(self.encoding):
1206 data = data.encode(self.encoding)
1207 self.stream.write(data)
1208
1209 def write_indent(self):
1210 # type: () -> None
1211 indent = self.indent or 0
1212 if (
1213 not self.indention
1214 or self.column > indent
1215 or (self.column == indent and not self.whitespace)
1216 ):
1217 if bool(self.no_newline):
1218 self.no_newline = False
1219 else:
1220 self.write_line_break()
1221 if self.column < indent:
1222 self.whitespace = True
1223 data = u' ' * (indent - self.column)
1224 self.column = indent
1225 if self.encoding:
1226 data = data.encode(self.encoding)
1227 self.stream.write(data)
1228
1229 def write_line_break(self, data=None):
1230 # type: (Any) -> None
1231 if data is None:
1232 data = self.best_line_break
1233 self.whitespace = True
1234 self.indention = True
1235 self.line += 1
1236 self.column = 0
1237 if bool(self.encoding):
1238 data = data.encode(self.encoding)
1239 self.stream.write(data)
1240
1241 def write_version_directive(self, version_text):
1242 # type: (Any) -> None
1243 data = u'%%YAML %s' % version_text
1244 if self.encoding:
1245 data = data.encode(self.encoding)
1246 self.stream.write(data)
1247 self.write_line_break()
1248
1249 def write_tag_directive(self, handle_text, prefix_text):
1250 # type: (Any, Any) -> None
1251 data = u'%%TAG %s %s' % (handle_text, prefix_text)
1252 if self.encoding:
1253 data = data.encode(self.encoding)
1254 self.stream.write(data)
1255 self.write_line_break()
1256
1257 # Scalar streams.
1258
1259 def write_single_quoted(self, text, split=True):
1260 # type: (Any, Any) -> None
1261 if self.root_context:
1262 if self.requested_indent is not None:
1263 self.write_line_break()
1264 if self.requested_indent != 0:
1265 self.write_indent()
1266 self.write_indicator(u"'", True)
1267 spaces = False
1268 breaks = False
1269 start = end = 0
1270 while end <= len(text):
1271 ch = None
1272 if end < len(text):
1273 ch = text[end]
1274 if spaces:
1275 if ch is None or ch != u' ':
1276 if (
1277 start + 1 == end
1278 and self.column > self.best_width
1279 and split
1280 and start != 0
1281 and end != len(text)
1282 ):
1283 self.write_indent()
1284 else:
1285 data = text[start:end]
1286 self.column += len(data)
1287 if bool(self.encoding):
1288 data = data.encode(self.encoding)
1289 self.stream.write(data)
1290 start = end
1291 elif breaks:
1292 if ch is None or ch not in u'\n\x85\u2028\u2029':
1293 if text[start] == u'\n':
1294 self.write_line_break()
1295 for br in text[start:end]:
1296 if br == u'\n':
1297 self.write_line_break()
1298 else:
1299 self.write_line_break(br)
1300 self.write_indent()
1301 start = end
1302 else:
1303 if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u"'":
1304 if start < end:
1305 data = text[start:end]
1306 self.column += len(data)
1307 if bool(self.encoding):
1308 data = data.encode(self.encoding)
1309 self.stream.write(data)
1310 start = end
1311 if ch == u"'":
1312 data = u"''"
1313 self.column += 2
1314 if bool(self.encoding):
1315 data = data.encode(self.encoding)
1316 self.stream.write(data)
1317 start = end + 1
1318 if ch is not None:
1319 spaces = ch == u' '
1320 breaks = ch in u'\n\x85\u2028\u2029'
1321 end += 1
1322 self.write_indicator(u"'", False)
1323
1324 ESCAPE_REPLACEMENTS = {
1325 u'\0': u'0',
1326 u'\x07': u'a',
1327 u'\x08': u'b',
1328 u'\x09': u't',
1329 u'\x0A': u'n',
1330 u'\x0B': u'v',
1331 u'\x0C': u'f',
1332 u'\x0D': u'r',
1333 u'\x1B': u'e',
1334 u'"': u'"',
1335 u'\\': u'\\',
1336 u'\x85': u'N',
1337 u'\xA0': u'_',
1338 u'\u2028': u'L',
1339 u'\u2029': u'P',
1340 }
1341
1342 def write_double_quoted(self, text, split=True):
1343 # type: (Any, Any) -> None
1344 if self.root_context:
1345 if self.requested_indent is not None:
1346 self.write_line_break()
1347 if self.requested_indent != 0:
1348 self.write_indent()
1349 self.write_indicator(u'"', True)
1350 start = end = 0
1351 while end <= len(text):
1352 ch = None
1353 if end < len(text):
1354 ch = text[end]
1355 if (
1356 ch is None
1357 or ch in u'"\\\x85\u2028\u2029\uFEFF'
1358 or not (
1359 u'\x20' <= ch <= u'\x7E'
1360 or (
1361 self.allow_unicode
1362 and (u'\xA0' <= ch <= u'\uD7FF' or u'\uE000' <= ch <= u'\uFFFD')
1363 )
1364 )
1365 ):
1366 if start < end:
1367 data = text[start:end]
1368 self.column += len(data)
1369 if bool(self.encoding):
1370 data = data.encode(self.encoding)
1371 self.stream.write(data)
1372 start = end
1373 if ch is not None:
1374 if ch in self.ESCAPE_REPLACEMENTS:
1375 data = u'\\' + self.ESCAPE_REPLACEMENTS[ch]
1376 elif ch <= u'\xFF':
1377 data = u'\\x%02X' % ord(ch)
1378 elif ch <= u'\uFFFF':
1379 data = u'\\u%04X' % ord(ch)
1380 else:
1381 data = u'\\U%08X' % ord(ch)
1382 self.column += len(data)
1383 if bool(self.encoding):
1384 data = data.encode(self.encoding)
1385 self.stream.write(data)
1386 start = end + 1
1387 if (
1388 0 < end < len(text) - 1
1389 and (ch == u' ' or start >= end)
1390 and self.column + (end - start) > self.best_width
1391 and split
1392 ):
1393 data = text[start:end] + u'\\'
1394 if start < end:
1395 start = end
1396 self.column += len(data)
1397 if bool(self.encoding):
1398 data = data.encode(self.encoding)
1399 self.stream.write(data)
1400 self.write_indent()
1401 self.whitespace = False
1402 self.indention = False
1403 if text[start] == u' ':
1404 data = u'\\'
1405 self.column += len(data)
1406 if bool(self.encoding):
1407 data = data.encode(self.encoding)
1408 self.stream.write(data)
1409 end += 1
1410 self.write_indicator(u'"', False)
1411
1412 def determine_block_hints(self, text):
1413 # type: (Any) -> Any
1414 indent = 0
1415 indicator = u''
1416 hints = u''
1417 if text:
1418 if text[0] in u' \n\x85\u2028\u2029':
1419 indent = self.best_sequence_indent
1420 hints += text_type(indent)
1421 elif self.root_context:
1422 for end in ['\n---', '\n...']:
1423 pos = 0
1424 while True:
1425 pos = text.find(end, pos)
1426 if pos == -1:
1427 break
1428 try:
1429 if text[pos + 4] in ' \r\n':
1430 break
1431 except IndexError:
1432 pass
1433 pos += 1
1434 if pos > -1:
1435 break
1436 if pos > 0:
1437 indent = self.best_sequence_indent
1438 if text[-1] not in u'\n\x85\u2028\u2029':
1439 indicator = u'-'
1440 elif len(text) == 1 or text[-2] in u'\n\x85\u2028\u2029':
1441 indicator = u'+'
1442 hints += indicator
1443 return hints, indent, indicator
1444
1445 def write_folded(self, text):
1446 # type: (Any) -> None
1447 hints, _indent, _indicator = self.determine_block_hints(text)
1448 self.write_indicator(u'>' + hints, True)
1449 if _indicator == u'+':
1450 self.open_ended = True
1451 self.write_line_break()
1452 leading_space = True
1453 spaces = False
1454 breaks = True
1455 start = end = 0
1456 while end <= len(text):
1457 ch = None
1458 if end < len(text):
1459 ch = text[end]
1460 if breaks:
1461 if ch is None or ch not in u'\n\x85\u2028\u2029\a':
1462 if (
1463 not leading_space
1464 and ch is not None
1465 and ch != u' '
1466 and text[start] == u'\n'
1467 ):
1468 self.write_line_break()
1469 leading_space = ch == u' '
1470 for br in text[start:end]:
1471 if br == u'\n':
1472 self.write_line_break()
1473 else:
1474 self.write_line_break(br)
1475 if ch is not None:
1476 self.write_indent()
1477 start = end
1478 elif spaces:
1479 if ch != u' ':
1480 if start + 1 == end and self.column > self.best_width:
1481 self.write_indent()
1482 else:
1483 data = text[start:end]
1484 self.column += len(data)
1485 if bool(self.encoding):
1486 data = data.encode(self.encoding)
1487 self.stream.write(data)
1488 start = end
1489 else:
1490 if ch is None or ch in u' \n\x85\u2028\u2029\a':
1491 data = text[start:end]
1492 self.column += len(data)
1493 if bool(self.encoding):
1494 data = data.encode(self.encoding)
1495 self.stream.write(data)
1496 if ch == u'\a':
1497 if end < (len(text) - 1) and not text[end + 2].isspace():
1498 self.write_line_break()
1499 self.write_indent()
1500 end += 2 # \a and the space that is inserted on the fold
1501 else:
1502 raise EmitterError('unexcpected fold indicator \\a before space')
1503 if ch is None:
1504 self.write_line_break()
1505 start = end
1506 if ch is not None:
1507 breaks = ch in u'\n\x85\u2028\u2029'
1508 spaces = ch == u' '
1509 end += 1
1510
1511 def write_literal(self, text, comment=None):
1512 # type: (Any, Any) -> None
1513 hints, _indent, _indicator = self.determine_block_hints(text)
1514 self.write_indicator(u'|' + hints, True)
1515 try:
1516 comment = comment[1][0]
1517 if comment:
1518 self.stream.write(comment)
1519 except (TypeError, IndexError):
1520 pass
1521 if _indicator == u'+':
1522 self.open_ended = True
1523 self.write_line_break()
1524 breaks = True
1525 start = end = 0
1526 while end <= len(text):
1527 ch = None
1528 if end < len(text):
1529 ch = text[end]
1530 if breaks:
1531 if ch is None or ch not in u'\n\x85\u2028\u2029':
1532 for br in text[start:end]:
1533 if br == u'\n':
1534 self.write_line_break()
1535 else:
1536 self.write_line_break(br)
1537 if ch is not None:
1538 if self.root_context:
1539 idnx = self.indent if self.indent is not None else 0
1540 self.stream.write(u' ' * (_indent + idnx))
1541 else:
1542 self.write_indent()
1543 start = end
1544 else:
1545 if ch is None or ch in u'\n\x85\u2028\u2029':
1546 data = text[start:end]
1547 if bool(self.encoding):
1548 data = data.encode(self.encoding)
1549 self.stream.write(data)
1550 if ch is None:
1551 self.write_line_break()
1552 start = end
1553 if ch is not None:
1554 breaks = ch in u'\n\x85\u2028\u2029'
1555 end += 1
1556
1557 def write_plain(self, text, split=True):
1558 # type: (Any, Any) -> None
1559 if self.root_context:
1560 if self.requested_indent is not None:
1561 self.write_line_break()
1562 if self.requested_indent != 0:
1563 self.write_indent()
1564 else:
1565 self.open_ended = True
1566 if not text:
1567 return
1568 if not self.whitespace:
1569 data = u' '
1570 self.column += len(data)
1571 if self.encoding:
1572 data = data.encode(self.encoding)
1573 self.stream.write(data)
1574 self.whitespace = False
1575 self.indention = False
1576 spaces = False
1577 breaks = False
1578 start = end = 0
1579 while end <= len(text):
1580 ch = None
1581 if end < len(text):
1582 ch = text[end]
1583 if spaces:
1584 if ch != u' ':
1585 if start + 1 == end and self.column > self.best_width and split:
1586 self.write_indent()
1587 self.whitespace = False
1588 self.indention = False
1589 else:
1590 data = text[start:end]
1591 self.column += len(data)
1592 if self.encoding:
1593 data = data.encode(self.encoding)
1594 self.stream.write(data)
1595 start = end
1596 elif breaks:
1597 if ch not in u'\n\x85\u2028\u2029': # type: ignore
1598 if text[start] == u'\n':
1599 self.write_line_break()
1600 for br in text[start:end]:
1601 if br == u'\n':
1602 self.write_line_break()
1603 else:
1604 self.write_line_break(br)
1605 self.write_indent()
1606 self.whitespace = False
1607 self.indention = False
1608 start = end
1609 else:
1610 if ch is None or ch in u' \n\x85\u2028\u2029':
1611 data = text[start:end]
1612 self.column += len(data)
1613 if self.encoding:
1614 data = data.encode(self.encoding)
1615 try:
1616 self.stream.write(data)
1617 except: # NOQA
1618 sys.stdout.write(repr(data) + '\n')
1619 raise
1620 start = end
1621 if ch is not None:
1622 spaces = ch == u' '
1623 breaks = ch in u'\n\x85\u2028\u2029'
1624 end += 1
1625
1626 def write_comment(self, comment, pre=False):
1627 # type: (Any, bool) -> None
1628 value = comment.value
1629 # nprintf('{:02d} {:02d} {!r}'.format(self.column, comment.start_mark.column, value))
1630 if not pre and value[-1] == '\n':
1631 value = value[:-1]
1632 try:
1633 # get original column position
1634 col = comment.start_mark.column
1635 if comment.value and comment.value.startswith('\n'):
1636 # never inject extra spaces if the comment starts with a newline
1637 # and not a real comment (e.g. if you have an empty line following a key-value
1638 col = self.column
1639 elif col < self.column + 1:
1640 ValueError
1641 except ValueError:
1642 col = self.column + 1
1643 # nprint('post_comment', self.line, self.column, value)
1644 try:
1645 # at least one space if the current column >= the start column of the comment
1646 # but not at the start of a line
1647 nr_spaces = col - self.column
1648 if self.column and value.strip() and nr_spaces < 1 and value[0] != '\n':
1649 nr_spaces = 1
1650 value = ' ' * nr_spaces + value
1651 try:
1652 if bool(self.encoding):
1653 value = value.encode(self.encoding)
1654 except UnicodeDecodeError:
1655 pass
1656 self.stream.write(value)
1657 except TypeError:
1658 raise
1659 if not pre:
1660 self.write_line_break()
1661
1662 def write_pre_comment(self, event):
1663 # type: (Any) -> bool
1664 comments = event.comment[1]
1665 if comments is None:
1666 return False
1667 try:
1668 start_events = (MappingStartEvent, SequenceStartEvent)
1669 for comment in comments:
1670 if isinstance(event, start_events) and getattr(comment, 'pre_done', None):
1671 continue
1672 if self.column != 0:
1673 self.write_line_break()
1674 self.write_comment(comment, pre=True)
1675 if isinstance(event, start_events):
1676 comment.pre_done = True
1677 except TypeError:
1678 sys.stdout.write('eventtt {} {}'.format(type(event), event))
1679 raise
1680 return True
1681
1682 def write_post_comment(self, event):
1683 # type: (Any) -> bool
1684 if self.event.comment[0] is None:
1685 return False
1686 comment = event.comment[0]
1687 self.write_comment(comment)
1688 return True