diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2016-12-28 23:27:59 +0100 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2016-12-29 03:19:09 +0100 |
commit | 4e07f8306c855be768d435baac68f14a6bca547b (patch) | |
tree | ce7b13e8dd6bfeeece1593d9de5cb89bbb845b08 /mp_tool | |
parent | ba2960846e52e9862a7d0aca69c33d29610276bb (diff) | |
download | mp-tool-4e07f8306c855be768d435baac68f14a6bca547b.tar.gz mp-tool-4e07f8306c855be768d435baac68f14a6bca547b.zip |
Add serial communication
Diffstat (limited to 'mp_tool')
-rw-r--r-- | mp_tool/__init__.py | 117 | ||||
-rw-r--r-- | mp_tool/serial.py | 135 | ||||
-rw-r--r-- | mp_tool/util.py | 65 | ||||
-rw-r--r-- | mp_tool/web.py | 117 |
4 files changed, 324 insertions, 110 deletions
diff --git a/mp_tool/__init__.py b/mp_tool/__init__.py index 481ba39..77b8820 100644 --- a/mp_tool/__init__.py +++ b/mp_tool/__init__.py @@ -1,112 +1,9 @@ -import websocket -import tty -import termios -from threading import Thread -from sys import stdout, stdin -from copy import copy +class Constants: + ENTER_RAW_MODE = b'\x01' # CTRL-A + ENTER_REPL_MODE = b'\x02' # CTRL-B + INTERRUPT = b'\x03' # CTRL-C + CTRL_D = b'\x04' # CTRL-D + MARKER_BEGIN = b'>>>>>>>>>>' + MARKER_END = b'<<<<<<<<<<' - -def connect_and_auth(url, password) -> websocket.WebSocket: - ws = websocket.create_connection(url, timeout=0.5) - frame = ws.recv_frame() - - if frame.data != b"Password: ": - raise Exception("Unexpected response: {}".format(frame.data)) - stdout.write(frame.data.decode('utf-8')) - ws.send(password + "\n") - - frame = ws.recv_frame() - if frame.data.strip() != b"WebREPL connected\r\n>>>": - raise Exception("Unexpected response: {}".format(frame.data)) - return ws - - -def do_eval(args): - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - stdout.write(read_until_eval_or_timeout(ws)) - ws.send(args.CODE[0] + "\r\n") - - result = read_until_eval_or_timeout(ws) - stdout.write(result[:-6]) - print("") - ws.close() - - -def read_until_eval_or_timeout(ws: websocket.WebSocket): - buf = "" - while not buf.endswith("\r\n>>> "): - buf += ws.recv() - return buf - - -class Reader(Thread): - def __init__(self, ws): - Thread.__init__(self) - self.ws = ws - self.stop = False - - def run(self): - while True: - try: - frame = self.ws.recv_frame() - stdout.write(frame.data.decode('utf-8')) - stdout.flush() - except Exception as e: - if self.stop: - break - - -def set_tty_raw_mode(fd): - saved_mode = termios.tcgetattr(fd) - - new_mode = copy(saved_mode) - new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO - new_mode[tty.CC][tty.VMIN] = 1 - new_mode[tty.CC][tty.VTIME] = 0 - set_tty_mode(fd, new_mode) - - return saved_mode - - -def set_tty_mode(fd, mode): - termios.tcsetattr(fd, termios.TCSAFLUSH, mode) - - -def do_repl(args): - print("Type ^[ CTRL-] or CTRL-D to quit") - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - - reader = Reader(ws) - reader.start() - - saved_tty_mode = set_tty_raw_mode(stdin.fileno()) - try: - tty.setraw(stdin.fileno()) - while True: - try: - in_char = stdin.read(1) - if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C - break - else: - ws.send(in_char) - except KeyboardInterrupt: - break - except Exception as _: - pass - - reader.stop = True - ws.close() - - set_tty_mode(stdin.fileno(), saved_tty_mode) - print("") - - -def do_put(args): - raise NotImplementedError() - - -def do_get(args): - raise NotImplementedError() diff --git a/mp_tool/serial.py b/mp_tool/serial.py new file mode 100644 index 0000000..c1838f2 --- /dev/null +++ b/mp_tool/serial.py @@ -0,0 +1,135 @@ +""" +Implementation of commands that run against the serial interface of micropython +""" +from . import Constants + +try: + import serial +except ImportError: + print("Could not find pyserial library") + raise + +import os + + +def eval(port: str, code: str): + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) + fh.write(Constants.ENTER_REPL_MODE) + _, _ = fh.readline(), fh.readline() + print(fh.readline().decode('utf-8').strip()) + print(fh.readline().decode('utf-8').strip()) + + fh.write(code.encode('utf-8') + b"\r\n") + fh.flush() + + buf = fh.read(1) + i = 0 + while not buf.endswith(b"\r\n>>> "): + buf += fh.read(1) + i += 1 + if i > 300: + raise Exception("Exceed number of bytes while seeking for end of output") + print(buf.decode('utf-8')[:-6]) + + +def ls(port: str, directory: str): + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import os\r\n") + fh.write(b"print()\r\n") + fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n") + fh.write(b"try:") + fh.write(b" print('\\n'.join(os.listdir(" + repr(directory).encode('utf-8') + b")))\r\n") + fh.write(b"except OSError as e:\r\n") + fh.write(b" print(str(e))\r\n") + fh.write(b"print('" + Constants.MARKER_END + b"')\r\n") + fh.write(b"print()\r\n") + fh.write(Constants.CTRL_D) + fh.flush() + fh.reset_input_buffer() + + _line_ok = fh.readline() + if fh.readline().strip() != Constants.MARKER_BEGIN: + raise Exception('Failed to find begin marker') + + line = fh.readline() + while line.strip() != Constants.MARKER_END: + print(line.strip().decode('utf-8')) + line = fh.readline() + + fh.write(Constants.ENTER_REPL_MODE) + + +def get(port: str, remote_filename: str, target: str): + if target: + if os.path.isdir(target): + local_filename = os.path.join(target, os.path.basename(remote_filename)) + else: + local_filename = target + else: + local_filename = os.path.basename(remote_filename) + + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import sys\r\n") + fh.write(b"import os\r\n") + fh.write(b"print()\r\n") + fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n") + fh.write(b"try:\r\n") + fh.write(" print(os.stat({})[6])\r\n".format(repr(remote_filename)).encode('utf-8')) + fh.write(b"except OSError:\r\n") + fh.write(b" print('-1')\r\n") + fh.write(b"print('" + Constants.MARKER_END + b"')\r\n") + fh.write("with open({}, 'rb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8')) + # use sys.stdout.buffer to avoid cr to crlf conversion + fh.write(b" sys.stdout.buffer.write(fh.read())\r\n") + fh.write(b"print()\r\n") + fh.write(Constants.CTRL_D) + fh.flush() + fh.reset_input_buffer() + + _line_ok = fh.readline() + + if fh.readline().strip() != Constants.MARKER_BEGIN: + raise Exception('Failed to find begin marker') + + length = int(fh.readline().strip().decode('utf-8')) + if fh.readline().strip() != Constants.MARKER_END: + raise Exception("Failed to read end marker value") + + if length == -1: + raise Exception("Failed to read file {}".format(remote_filename)) + + print("File length: {}".format(length)) + + with open(local_filename, 'wb') as fh_out: + bytes_processed = fh_out.write(fh.read(length)) + print("{} bytes written to {}".format(bytes_processed, local_filename)) + + fh.write(Constants.ENTER_REPL_MODE) + + +def put(port: str, local_filename: str, target: str): + if target: + remote_filename = os.path.join(target, local_filename) + else: + remote_filename = os.path.basename(local_filename) + + with open(local_filename, 'br') as file_fh: + data = file_fh.read() + + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import sys\r\n") + fh.write("with open({}, 'wb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8')) + if len(data) == 0: + fh.write(b" pass\r\n") + else: + fh.write(" fh.write(sys.stdin.buffer.read({}))\r\n".format(len(data)).encode('utf-8')) + fh.write(Constants.CTRL_D) + fh.write(data) + fh.write(Constants.ENTER_REPL_MODE) diff --git a/mp_tool/util.py b/mp_tool/util.py new file mode 100644 index 0000000..7e3f610 --- /dev/null +++ b/mp_tool/util.py @@ -0,0 +1,65 @@ +import argparse +import platform + +class HelpAction(argparse._HelpAction): + def __call__(self, parser, namespace, values, option_string=None): + formatter = parser._get_formatter() + formatter.add_usage(parser.usage, + parser._actions, + parser._mutually_exclusive_groups) + + formatter.start_section(parser._optionals.title) + formatter.add_text(parser._optionals.description) + formatter.add_arguments(parser._optionals._group_actions) + formatter.end_section() + + subparsers_actions = [ + action for action in parser._actions + if isinstance(action, argparse._SubParsersAction)] + + for subparsers_action in subparsers_actions: + # get all subparsers and print help + subparsers = subparsers_action.choices + for subaction in subparsers_action._get_subactions(): + subparser = subparsers[subaction.dest] + usage = formatter._format_actions_usage(subparser._actions, []) + usage_parent = formatter._format_actions_usage(filter( + lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)), + parser._actions), []) + formatter.start_section("{} {} {} {}".format(formatter._prog, + usage_parent, + subaction.dest, + usage)) + formatter.add_text(subaction.help) + formatter.add_arguments(subparser._positionals._group_actions) + formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction), + subparser._optionals._group_actions)) + formatter.end_section() + + print(formatter.format_help()) + parser.exit(0) + + +def format_subcommands_help(parser: argparse.ArgumentParser): + subparsers_actions = [ + action for action in parser._actions + if isinstance(action, argparse._SubParsersAction)] + + formatter = parser._get_formatter() + formatter.add_usage(parser.usage, + parser._actions, + parser._mutually_exclusive_groups) + + formatter.start_section("Choose subcommand") + for subparsers_action in subparsers_actions: + formatter.add_argument(subparsers_action) + formatter.end_section() + + return formatter.format_help() + + +def get_default_serial_port() -> str: + if platform.system() == "Windows": + return 'COM3' + else: + return '/dev/ttyUSB0' diff --git a/mp_tool/web.py b/mp_tool/web.py new file mode 100644 index 0000000..6dcccfb --- /dev/null +++ b/mp_tool/web.py @@ -0,0 +1,117 @@ +""" +Implementation of commands that run against the websocket interface of micropython +""" +from . import Constants + +import websocket + +import tty +import termios +from threading import Thread +from sys import stdout, stdin +from copy import copy + + +def get(url: str, password: str): + raise NotImplementedError() + + +def put(url: str, password: str): + raise NotImplementedError() + + +def connect_and_auth(url, password) -> websocket.WebSocket: + ws = websocket.create_connection(url, timeout=0.5) + frame = ws.recv_frame() + + if frame.data != b"Password: ": + raise Exception("Unexpected response: {}".format(frame.data)) + stdout.write(frame.data.decode('utf-8')) + ws.send(password + "\n") + + frame = ws.recv_frame() + if frame.data.strip() != b"WebREPL connected\r\n>>>": + raise Exception("Unexpected response: {}".format(frame.data)) + return ws + + +def eval(url: str, password: str, code: str): + ws = connect_and_auth(url, password) + ws.send(Constants.ENTER_REPL_MODE) + stdout.write(read_until_eval_or_timeout(ws)) + ws.send(code + "\r\n") + + result = read_until_eval_or_timeout(ws) + stdout.write(result[:-6]) + print("") + ws.close() + + +def read_until_eval_or_timeout(ws: websocket.WebSocket): + buf = "" + while not buf.endswith("\r\n>>> "): + buf += ws.recv() + return buf + + +class Reader(Thread): + def __init__(self, ws): + Thread.__init__(self) + self.ws = ws + self.stop = False + + def run(self): + while True: + try: + frame = self.ws.recv_frame() + stdout.write(frame.data.decode('utf-8')) + stdout.flush() + except Exception as e: + if self.stop: + break + + +def set_tty_raw_mode(fd): + saved_mode = termios.tcgetattr(fd) + + new_mode = copy(saved_mode) + new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO + new_mode[tty.CC][tty.VMIN] = 1 + new_mode[tty.CC][tty.VTIME] = 0 + set_tty_mode(fd, new_mode) + + return saved_mode + + +def set_tty_mode(fd, mode): + termios.tcsetattr(fd, termios.TCSAFLUSH, mode) + + +def repl(url: str, password: str): + print("Type ^[ CTRL-] or CTRL-D to quit") + ws = connect_and_auth(url, password) + ws.send("\x02") + + reader = Reader(ws) + reader.start() + + saved_tty_mode = set_tty_raw_mode(stdin.fileno()) + try: + tty.setraw(stdin.fileno()) + while True: + try: + in_char = stdin.read(1) + if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C + break + else: + ws.send(in_char) + except KeyboardInterrupt: + break + except Exception as _: + pass + + reader.stop = True + ws.close() + + set_tty_mode(stdin.fileno(), saved_tty_mode) + print("") |