diff options
-rwxr-xr-x | mp-tool | 135 | ||||
-rw-r--r-- | mp_tool/__init__.py | 117 | ||||
-rw-r--r-- | mp_tool/serial.py | 135 | ||||
-rw-r--r-- | mp_tool/util.py | 65 | ||||
-rw-r--r-- | mp_tool/web.py | 117 | ||||
-rw-r--r-- | setup.py | 2 |
6 files changed, 383 insertions, 188 deletions
@@ -1,88 +1,69 @@ #!/usr/bin/env python3 -import mp_tool - -import argparse -import sys - - -class HelpAction(argparse._HelpAction): - def __call__(self, parser, namespace, values, option_string=None): - formatter = parser._get_formatter() - formatter.add_usage(parser.usage, - parser._actions, - parser._mutually_exclusive_groups) - - formatter.start_section(parser._optionals.title) - formatter.add_text(parser._optionals.description) - formatter.add_arguments(parser._optionals._group_actions) - formatter.end_section() - - subparsers_actions = [ - action for action in parser._actions - if isinstance(action, argparse._SubParsersAction)] - - for subparsers_action in subparsers_actions: - # get all subparsers and print help - subparsers = subparsers_action.choices - for subaction in subparsers_action._get_subactions(): - subparser = subparsers[subaction.dest] - usage = formatter._format_actions_usage(subparser._actions, []) - usage_parent = formatter._format_actions_usage(filter( - lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)), - parser._actions), []) - formatter.start_section("{} {} {} {}".format(formatter._prog, - usage_parent, - subaction.dest, - usage)) - formatter.add_text(subaction.help) - formatter.add_arguments(subparser._positionals._group_actions) - formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction), - subparser._optionals._group_actions)) - formatter.end_section() - - print(formatter.format_help()) - parser.exit(0) +from mp_tool.util import HelpAction, format_subcommands_help, get_default_serial_port +from argparse import ArgumentParser +from importlib import import_module +from sys import argv +import traceback if __name__ == '__main__': - parser = argparse.ArgumentParser(add_help=False) + default_port = get_default_serial_port() + + parser = ArgumentParser(add_help=False) parser.add_argument("--help", action=HelpAction, help="Display full help") - parser.add_argument("--password", action='store', nargs='?') - parser.add_argument("WEBSOCKET", action='store', nargs=1, - help="Websocket address (e.g. ws://ESP_E1278E:8266)") subparsers = parser.add_subparsers() - parser_eval = subparsers.add_parser("eval", help="Eval python code remotely") - parser_eval.set_defaults(func=mp_tool.do_eval) - parser_eval.add_argument("CODE", action='store', nargs=1, help="Code to execute") - - parser_repl = subparsers.add_parser("repl", help="Start interactive REPL") - parser_repl.set_defaults(func=mp_tool.do_repl) - - parser_put = subparsers.add_parser("put", help="Send file to remote") - parser_put.set_defaults(func=mp_tool.do_put) - - parser_get = subparsers.add_parser("get", help="Load file from remote") - parser_put.set_defaults(func=mp_tool.do_get) - - args = parser.parse_args(sys.argv[1:]) + parser_eval = subparsers.add_parser("eval", help="Eval python code over websocket") + parser_eval.set_defaults(func=lambda a: import_module('mp_tool.web').eval(a.WEBSOCKET, a.password, a.CODE)) + parser_eval.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)") + parser_eval.add_argument("CODE", help="Code to execute") + parser_eval.add_argument("--password") + + parser_eval_serial = subparsers.add_parser("eval-serial", help="Eval python code over serial") + parser_eval_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').eval(a.port, a.CODE)) + parser_eval_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port) + parser_eval_serial.add_argument("CODE", help="Code to execute") + + parser_repl = subparsers.add_parser("repl", help="Interactive REPL over websocket") + parser_repl.set_defaults(func=lambda a: import_module('mp_tool.web').repl(a.WEBSOCKET, a.password)) + parser_repl.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)") + parser_repl.add_argument("--password") + + parser_put = subparsers.add_parser("put", help="Send file over websocket") + parser_put.set_defaults(func=lambda a: import_module('mp_tool.web').put(a.WEBSOCKET, a.password, a.CODE)) + parser_put.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)") + parser_put.add_argument("--password") + + parser_put_serial = subparsers.add_parser("put-serial", help="Send file over serial") + parser_put_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').put(a.port, a.FILE, a.TARGET)) + parser_put_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port) + parser_put_serial.add_argument("FILE", help="Filename") + parser_put_serial.add_argument("TARGET", nargs='?', help="remote target path/filename") + + parser_get = subparsers.add_parser("get", help="Load file over websocket") + parser_get.set_defaults(func=lambda a: import_module('mp_tool.web').get(a.WEBSOCKET, a.password)) + parser_get.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)") + parser_get.add_argument("--password") + + parser_get_serial = subparsers.add_parser("get-serial", help="Get file over serial") + parser_get_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').get(a.port, a.FILE, a.TARGET)) + parser_get_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port) + parser_get_serial.add_argument("FILE", help="Filename") + parser_get_serial.add_argument("TARGET", nargs='?', help="local target path/filename") + + parser_ls_serial = subparsers.add_parser("ls-serial", help="List files over serial") + parser_ls_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').ls(a.port, a.DIR)) + parser_ls_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port) + parser_ls_serial.add_argument("DIR", default=".", nargs='?', help="List files in this directory (Default '.')") + + args = parser.parse_args(argv[1:]) if 'func' in args: - args.func(args) + try: + args.func(args) + except Exception as e: + print("Call failed with arguments: " + str(args)) + parser.exit(1, traceback.format_exc()) else: - subparsers_actions = [ - action for action in parser._actions - if isinstance(action, argparse._SubParsersAction)] - - formatter = parser._get_formatter() - formatter.add_usage(parser.usage, - parser._actions, - parser._mutually_exclusive_groups) - - formatter.start_section("Choose subcommand") - for subparsers_action in subparsers_actions: - formatter.add_argument(subparsers_action) - formatter.end_section() - - print(formatter.format_help()) + print(format_subcommands_help(parser)) parser.exit(0) diff --git a/mp_tool/__init__.py b/mp_tool/__init__.py index 481ba39..77b8820 100644 --- a/mp_tool/__init__.py +++ b/mp_tool/__init__.py @@ -1,112 +1,9 @@ -import websocket -import tty -import termios -from threading import Thread -from sys import stdout, stdin -from copy import copy +class Constants: + ENTER_RAW_MODE = b'\x01' # CTRL-A + ENTER_REPL_MODE = b'\x02' # CTRL-B + INTERRUPT = b'\x03' # CTRL-C + CTRL_D = b'\x04' # CTRL-D + MARKER_BEGIN = b'>>>>>>>>>>' + MARKER_END = b'<<<<<<<<<<' - -def connect_and_auth(url, password) -> websocket.WebSocket: - ws = websocket.create_connection(url, timeout=0.5) - frame = ws.recv_frame() - - if frame.data != b"Password: ": - raise Exception("Unexpected response: {}".format(frame.data)) - stdout.write(frame.data.decode('utf-8')) - ws.send(password + "\n") - - frame = ws.recv_frame() - if frame.data.strip() != b"WebREPL connected\r\n>>>": - raise Exception("Unexpected response: {}".format(frame.data)) - return ws - - -def do_eval(args): - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - stdout.write(read_until_eval_or_timeout(ws)) - ws.send(args.CODE[0] + "\r\n") - - result = read_until_eval_or_timeout(ws) - stdout.write(result[:-6]) - print("") - ws.close() - - -def read_until_eval_or_timeout(ws: websocket.WebSocket): - buf = "" - while not buf.endswith("\r\n>>> "): - buf += ws.recv() - return buf - - -class Reader(Thread): - def __init__(self, ws): - Thread.__init__(self) - self.ws = ws - self.stop = False - - def run(self): - while True: - try: - frame = self.ws.recv_frame() - stdout.write(frame.data.decode('utf-8')) - stdout.flush() - except Exception as e: - if self.stop: - break - - -def set_tty_raw_mode(fd): - saved_mode = termios.tcgetattr(fd) - - new_mode = copy(saved_mode) - new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO - new_mode[tty.CC][tty.VMIN] = 1 - new_mode[tty.CC][tty.VTIME] = 0 - set_tty_mode(fd, new_mode) - - return saved_mode - - -def set_tty_mode(fd, mode): - termios.tcsetattr(fd, termios.TCSAFLUSH, mode) - - -def do_repl(args): - print("Type ^[ CTRL-] or CTRL-D to quit") - ws = connect_and_auth(args.WEBSOCKET[0], args.password) - ws.send("\x02") - - reader = Reader(ws) - reader.start() - - saved_tty_mode = set_tty_raw_mode(stdin.fileno()) - try: - tty.setraw(stdin.fileno()) - while True: - try: - in_char = stdin.read(1) - if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C - break - else: - ws.send(in_char) - except KeyboardInterrupt: - break - except Exception as _: - pass - - reader.stop = True - ws.close() - - set_tty_mode(stdin.fileno(), saved_tty_mode) - print("") - - -def do_put(args): - raise NotImplementedError() - - -def do_get(args): - raise NotImplementedError() diff --git a/mp_tool/serial.py b/mp_tool/serial.py new file mode 100644 index 0000000..c1838f2 --- /dev/null +++ b/mp_tool/serial.py @@ -0,0 +1,135 @@ +""" +Implementation of commands that run against the serial interface of micropython +""" +from . import Constants + +try: + import serial +except ImportError: + print("Could not find pyserial library") + raise + +import os + + +def eval(port: str, code: str): + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) + fh.write(Constants.ENTER_REPL_MODE) + _, _ = fh.readline(), fh.readline() + print(fh.readline().decode('utf-8').strip()) + print(fh.readline().decode('utf-8').strip()) + + fh.write(code.encode('utf-8') + b"\r\n") + fh.flush() + + buf = fh.read(1) + i = 0 + while not buf.endswith(b"\r\n>>> "): + buf += fh.read(1) + i += 1 + if i > 300: + raise Exception("Exceed number of bytes while seeking for end of output") + print(buf.decode('utf-8')[:-6]) + + +def ls(port: str, directory: str): + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import os\r\n") + fh.write(b"print()\r\n") + fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n") + fh.write(b"try:") + fh.write(b" print('\\n'.join(os.listdir(" + repr(directory).encode('utf-8') + b")))\r\n") + fh.write(b"except OSError as e:\r\n") + fh.write(b" print(str(e))\r\n") + fh.write(b"print('" + Constants.MARKER_END + b"')\r\n") + fh.write(b"print()\r\n") + fh.write(Constants.CTRL_D) + fh.flush() + fh.reset_input_buffer() + + _line_ok = fh.readline() + if fh.readline().strip() != Constants.MARKER_BEGIN: + raise Exception('Failed to find begin marker') + + line = fh.readline() + while line.strip() != Constants.MARKER_END: + print(line.strip().decode('utf-8')) + line = fh.readline() + + fh.write(Constants.ENTER_REPL_MODE) + + +def get(port: str, remote_filename: str, target: str): + if target: + if os.path.isdir(target): + local_filename = os.path.join(target, os.path.basename(remote_filename)) + else: + local_filename = target + else: + local_filename = os.path.basename(remote_filename) + + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import sys\r\n") + fh.write(b"import os\r\n") + fh.write(b"print()\r\n") + fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n") + fh.write(b"try:\r\n") + fh.write(" print(os.stat({})[6])\r\n".format(repr(remote_filename)).encode('utf-8')) + fh.write(b"except OSError:\r\n") + fh.write(b" print('-1')\r\n") + fh.write(b"print('" + Constants.MARKER_END + b"')\r\n") + fh.write("with open({}, 'rb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8')) + # use sys.stdout.buffer to avoid cr to crlf conversion + fh.write(b" sys.stdout.buffer.write(fh.read())\r\n") + fh.write(b"print()\r\n") + fh.write(Constants.CTRL_D) + fh.flush() + fh.reset_input_buffer() + + _line_ok = fh.readline() + + if fh.readline().strip() != Constants.MARKER_BEGIN: + raise Exception('Failed to find begin marker') + + length = int(fh.readline().strip().decode('utf-8')) + if fh.readline().strip() != Constants.MARKER_END: + raise Exception("Failed to read end marker value") + + if length == -1: + raise Exception("Failed to read file {}".format(remote_filename)) + + print("File length: {}".format(length)) + + with open(local_filename, 'wb') as fh_out: + bytes_processed = fh_out.write(fh.read(length)) + print("{} bytes written to {}".format(bytes_processed, local_filename)) + + fh.write(Constants.ENTER_REPL_MODE) + + +def put(port: str, local_filename: str, target: str): + if target: + remote_filename = os.path.join(target, local_filename) + else: + remote_filename = os.path.basename(local_filename) + + with open(local_filename, 'br') as file_fh: + data = file_fh.read() + + with serial.Serial(port=port, baudrate=115200) as fh: + fh.write(Constants.INTERRUPT) # ctrl-c interrupt + fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode + fh.write(b"import sys\r\n") + fh.write("with open({}, 'wb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8')) + if len(data) == 0: + fh.write(b" pass\r\n") + else: + fh.write(" fh.write(sys.stdin.buffer.read({}))\r\n".format(len(data)).encode('utf-8')) + fh.write(Constants.CTRL_D) + fh.write(data) + fh.write(Constants.ENTER_REPL_MODE) diff --git a/mp_tool/util.py b/mp_tool/util.py new file mode 100644 index 0000000..7e3f610 --- /dev/null +++ b/mp_tool/util.py @@ -0,0 +1,65 @@ +import argparse +import platform + +class HelpAction(argparse._HelpAction): + def __call__(self, parser, namespace, values, option_string=None): + formatter = parser._get_formatter() + formatter.add_usage(parser.usage, + parser._actions, + parser._mutually_exclusive_groups) + + formatter.start_section(parser._optionals.title) + formatter.add_text(parser._optionals.description) + formatter.add_arguments(parser._optionals._group_actions) + formatter.end_section() + + subparsers_actions = [ + action for action in parser._actions + if isinstance(action, argparse._SubParsersAction)] + + for subparsers_action in subparsers_actions: + # get all subparsers and print help + subparsers = subparsers_action.choices + for subaction in subparsers_action._get_subactions(): + subparser = subparsers[subaction.dest] + usage = formatter._format_actions_usage(subparser._actions, []) + usage_parent = formatter._format_actions_usage(filter( + lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)), + parser._actions), []) + formatter.start_section("{} {} {} {}".format(formatter._prog, + usage_parent, + subaction.dest, + usage)) + formatter.add_text(subaction.help) + formatter.add_arguments(subparser._positionals._group_actions) + formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction), + subparser._optionals._group_actions)) + formatter.end_section() + + print(formatter.format_help()) + parser.exit(0) + + +def format_subcommands_help(parser: argparse.ArgumentParser): + subparsers_actions = [ + action for action in parser._actions + if isinstance(action, argparse._SubParsersAction)] + + formatter = parser._get_formatter() + formatter.add_usage(parser.usage, + parser._actions, + parser._mutually_exclusive_groups) + + formatter.start_section("Choose subcommand") + for subparsers_action in subparsers_actions: + formatter.add_argument(subparsers_action) + formatter.end_section() + + return formatter.format_help() + + +def get_default_serial_port() -> str: + if platform.system() == "Windows": + return 'COM3' + else: + return '/dev/ttyUSB0' diff --git a/mp_tool/web.py b/mp_tool/web.py new file mode 100644 index 0000000..6dcccfb --- /dev/null +++ b/mp_tool/web.py @@ -0,0 +1,117 @@ +""" +Implementation of commands that run against the websocket interface of micropython +""" +from . import Constants + +import websocket + +import tty +import termios +from threading import Thread +from sys import stdout, stdin +from copy import copy + + +def get(url: str, password: str): + raise NotImplementedError() + + +def put(url: str, password: str): + raise NotImplementedError() + + +def connect_and_auth(url, password) -> websocket.WebSocket: + ws = websocket.create_connection(url, timeout=0.5) + frame = ws.recv_frame() + + if frame.data != b"Password: ": + raise Exception("Unexpected response: {}".format(frame.data)) + stdout.write(frame.data.decode('utf-8')) + ws.send(password + "\n") + + frame = ws.recv_frame() + if frame.data.strip() != b"WebREPL connected\r\n>>>": + raise Exception("Unexpected response: {}".format(frame.data)) + return ws + + +def eval(url: str, password: str, code: str): + ws = connect_and_auth(url, password) + ws.send(Constants.ENTER_REPL_MODE) + stdout.write(read_until_eval_or_timeout(ws)) + ws.send(code + "\r\n") + + result = read_until_eval_or_timeout(ws) + stdout.write(result[:-6]) + print("") + ws.close() + + +def read_until_eval_or_timeout(ws: websocket.WebSocket): + buf = "" + while not buf.endswith("\r\n>>> "): + buf += ws.recv() + return buf + + +class Reader(Thread): + def __init__(self, ws): + Thread.__init__(self) + self.ws = ws + self.stop = False + + def run(self): + while True: + try: + frame = self.ws.recv_frame() + stdout.write(frame.data.decode('utf-8')) + stdout.flush() + except Exception as e: + if self.stop: + break + + +def set_tty_raw_mode(fd): + saved_mode = termios.tcgetattr(fd) + + new_mode = copy(saved_mode) + new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO + new_mode[tty.CC][tty.VMIN] = 1 + new_mode[tty.CC][tty.VTIME] = 0 + set_tty_mode(fd, new_mode) + + return saved_mode + + +def set_tty_mode(fd, mode): + termios.tcsetattr(fd, termios.TCSAFLUSH, mode) + + +def repl(url: str, password: str): + print("Type ^[ CTRL-] or CTRL-D to quit") + ws = connect_and_auth(url, password) + ws.send("\x02") + + reader = Reader(ws) + reader.start() + + saved_tty_mode = set_tty_raw_mode(stdin.fileno()) + try: + tty.setraw(stdin.fileno()) + while True: + try: + in_char = stdin.read(1) + if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C + break + else: + ws.send(in_char) + except KeyboardInterrupt: + break + except Exception as _: + pass + + reader.stop = True + ws.close() + + set_tty_mode(stdin.fileno(), saved_tty_mode) + print("") @@ -10,7 +10,7 @@ setup(name='mp-tool', packages=['mp_tool'], scripts=['mp-tool'], url='https://example.com/', - install_requires=['websocket_client==0.40.0'], + install_requires=['websocket_client==0.40.0','pyserial==3.2.1'], tests_require=[], classifiers=[ "Programming Language :: Python", |