comparison env/lib/python3.9/site-packages/boto/mashups/interactive.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (C) 2003-2007 Robey Pointer <robey@lag.net>
2 #
3 # This file is part of paramiko.
4 #
5 # Paramiko is free software; you can redistribute it and/or modify it under the
6 # terms of the GNU Lesser General Public License as published by the Free
7 # Software Foundation; either version 2.1 of the License, or (at your option)
8 # any later version.
9 #
10 # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
18 from __future__ import print_function
19
20 import socket
21 import sys
22
23 # windows does not have termios...
24 try:
25 import termios
26 import tty
27 has_termios = True
28 except ImportError:
29 has_termios = False
30
31
32 def interactive_shell(chan):
33 if has_termios:
34 posix_shell(chan)
35 else:
36 windows_shell(chan)
37
38
39 def posix_shell(chan):
40 import select
41
42 oldtty = termios.tcgetattr(sys.stdin)
43 try:
44 tty.setraw(sys.stdin.fileno())
45 tty.setcbreak(sys.stdin.fileno())
46 chan.settimeout(0.0)
47
48 while True:
49 r, w, e = select.select([chan, sys.stdin], [], [])
50 if chan in r:
51 try:
52 x = chan.recv(1024)
53 if len(x) == 0:
54 print('\r\n*** EOF\r\n', end=' ')
55 break
56 sys.stdout.write(x)
57 sys.stdout.flush()
58 except socket.timeout:
59 pass
60 if sys.stdin in r:
61 x = sys.stdin.read(1)
62 if len(x) == 0:
63 break
64 chan.send(x)
65
66 finally:
67 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
68
69
70 # thanks to Mike Looijmans for this code
71 def windows_shell(chan):
72 import threading
73
74 sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
75
76 def writeall(sock):
77 while True:
78 data = sock.recv(256)
79 if not data:
80 sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
81 sys.stdout.flush()
82 break
83 sys.stdout.write(data)
84 sys.stdout.flush()
85
86 writer = threading.Thread(target=writeall, args=(chan,))
87 writer.start()
88
89 try:
90 while True:
91 d = sys.stdin.read(1)
92 if not d:
93 break
94 chan.send(d)
95 except EOFError:
96 # user hit ^Z or F6
97 pass