From 4e07f8306c855be768d435baac68f14a6bca547b Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Wed, 28 Dec 2016 23:27:59 +0100 Subject: Add serial communication --- mp_tool/__init__.py | 117 ++++------------------------------------------------ 1 file changed, 7 insertions(+), 110 deletions(-) (limited to 'mp_tool/__init__.py') diff --git a/mp_tool/__init__.py b/mp_tool/__init__.py index 481ba39..77b8820 100644 --- a/mp_tool/__init__.py +++ b/mp_tool/__init__.py @@ -1,112 +1,9 @@ -import websocket -import tty -import termios -from threading import Thread -from sys import stdout, stdin -from copy import copy +class Constants: + ENTER_RAW_MODE = b'\x01' # CTRL-A + ENTER_REPL_MODE = b'\x02' # CTRL-B + INTERRUPT = b'\x03' # CTRL-C + CTRL_D = b'\x04' # CTRL-D + MARKER_BEGIN = b'>>>>>>>>>>' + MARKER_END = b'<<<<<<<<<<' - -def connect_and_auth(url, password) -> websocket.WebSocket: - ws = websocket.create_connection(url, timeout=0.5) - frame = ws.recv_frame() - - if frame.data != b"Password: ": - raise Exception("Unexpected response: {}".format(frame.data)) - stdout.write(frame.data.decode('utf-8')) - ws.send(password + "\n") - - frame = ws.recv_frame() - if frame.data.strip() != b"WebREPL connected\r\n>>>": - raise Exception("Unexpected response: {}".format(frame.data)) - return ws - - -def do_eval(args): - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - stdout.write(read_until_eval_or_timeout(ws)) - ws.send(args.CODE[0] + "\r\n") - - result = read_until_eval_or_timeout(ws) - stdout.write(result[:-6]) - print("") - ws.close() - - -def read_until_eval_or_timeout(ws: websocket.WebSocket): - buf = "" - while not buf.endswith("\r\n>>> "): - buf += ws.recv() - return buf - - -class Reader(Thread): - def __init__(self, ws): - Thread.__init__(self) - self.ws = ws - self.stop = False - - def run(self): - while True: - try: - frame = self.ws.recv_frame() - stdout.write(frame.data.decode('utf-8')) - stdout.flush() - except Exception as e: - if self.stop: - break - - -def set_tty_raw_mode(fd): - saved_mode = termios.tcgetattr(fd) - - new_mode = copy(saved_mode) - new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO - new_mode[tty.CC][tty.VMIN] = 1 - new_mode[tty.CC][tty.VTIME] = 0 - set_tty_mode(fd, new_mode) - - return saved_mode - - -def set_tty_mode(fd, mode): - termios.tcsetattr(fd, termios.TCSAFLUSH, mode) - - -def do_repl(args): - print("Type ^[ CTRL-] or CTRL-D to quit") - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - - reader = Reader(ws) - reader.start() - - saved_tty_mode = set_tty_raw_mode(stdin.fileno()) - try: - tty.setraw(stdin.fileno()) - while True: - try: - in_char = stdin.read(1) - if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C - break - else: - ws.send(in_char) - except KeyboardInterrupt: - break - except Exception as _: - pass - - reader.stop = True - ws.close() - - set_tty_mode(stdin.fileno(), saved_tty_mode) - print("") - - -def do_put(args): - raise NotImplementedError() - - -def do_get(args): - raise NotImplementedError() -- cgit v1.2.1