blob: 481ba3929ce6284cf0bf63e80d66b8d575800194 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import websocket
import tty
import termios
from threading import Thread
from sys import stdout, stdin
from copy import copy
def connect_and_auth(url, password) -> websocket.WebSocket:
ws = websocket.create_connection(url, timeout=0.5)
frame = ws.recv_frame()
if frame.data != b"Password: ":
raise Exception("Unexpected response: {}".format(frame.data))
stdout.write(frame.data.decode('utf-8'))
ws.send(password + "\n")
frame = ws.recv_frame()
if frame.data.strip() != b"WebREPL connected\r\n>>>":
raise Exception("Unexpected response: {}".format(frame.data))
return ws
def do_eval(args):
ws = connect_and_auth(args.WEBSOCKET[0], args.password)
ws.send("\x02")
stdout.write(read_until_eval_or_timeout(ws))
ws.send(args.CODE[0] + "\r\n")
result = read_until_eval_or_timeout(ws)
stdout.write(result[:-6])
print("")
ws.close()
def read_until_eval_or_timeout(ws: websocket.WebSocket):
buf = ""
while not buf.endswith("\r\n>>> "):
buf += ws.recv()
return buf
class Reader(Thread):
def __init__(self, ws):
Thread.__init__(self)
self.ws = ws
self.stop = False
def run(self):
while True:
try:
frame = self.ws.recv_frame()
stdout.write(frame.data.decode('utf-8'))
stdout.flush()
except Exception as e:
if self.stop:
break
def set_tty_raw_mode(fd):
saved_mode = termios.tcgetattr(fd)
new_mode = copy(saved_mode)
new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO
new_mode[tty.CC][tty.VMIN] = 1
new_mode[tty.CC][tty.VTIME] = 0
set_tty_mode(fd, new_mode)
return saved_mode
def set_tty_mode(fd, mode):
termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
def do_repl(args):
print("Type ^[ CTRL-] or CTRL-D to quit")
ws = connect_and_auth(args.WEBSOCKET[0], args.password)
ws.send("\x02")
reader = Reader(ws)
reader.start()
saved_tty_mode = set_tty_raw_mode(stdin.fileno())
try:
tty.setraw(stdin.fileno())
while True:
try:
in_char = stdin.read(1)
if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C
break
else:
ws.send(in_char)
except KeyboardInterrupt:
break
except Exception as _:
pass
reader.stop = True
ws.close()
set_tty_mode(stdin.fileno(), saved_tty_mode)
print("")
def do_put(args):
raise NotImplementedError()
def do_get(args):
raise NotImplementedError()
|