summaryrefslogtreecommitdiff
path: root/mp_tool
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2016-12-28 23:27:59 +0100
committerYves Fischer <yvesf-git@xapek.org>2016-12-29 03:19:09 +0100
commit4e07f8306c855be768d435baac68f14a6bca547b (patch)
treece7b13e8dd6bfeeece1593d9de5cb89bbb845b08 /mp_tool
parentba2960846e52e9862a7d0aca69c33d29610276bb (diff)
downloadmp-tool-4e07f8306c855be768d435baac68f14a6bca547b.tar.gz
mp-tool-4e07f8306c855be768d435baac68f14a6bca547b.zip
Add serial communication
Diffstat (limited to 'mp_tool')
-rw-r--r--mp_tool/__init__.py117
-rw-r--r--mp_tool/serial.py135
-rw-r--r--mp_tool/util.py65
-rw-r--r--mp_tool/web.py117
4 files changed, 324 insertions, 110 deletions
diff --git a/mp_tool/__init__.py b/mp_tool/__init__.py
index 481ba39..77b8820 100644
--- a/mp_tool/__init__.py
+++ b/mp_tool/__init__.py
@@ -1,112 +1,9 @@
-import websocket
-import tty
-import termios
-from threading import Thread
-from sys import stdout, stdin
-from copy import copy
+class Constants:
+ ENTER_RAW_MODE = b'\x01' # CTRL-A
+ ENTER_REPL_MODE = b'\x02' # CTRL-B
+ INTERRUPT = b'\x03' # CTRL-C
+ CTRL_D = b'\x04' # CTRL-D
+ MARKER_BEGIN = b'>>>>>>>>>>'
+ MARKER_END = b'<<<<<<<<<<'
-
-def connect_and_auth(url, password) -> websocket.WebSocket:
- ws = websocket.create_connection(url, timeout=0.5)
- frame = ws.recv_frame()
-
- if frame.data != b"Password: ":
- raise Exception("Unexpected response: {}".format(frame.data))
- stdout.write(frame.data.decode('utf-8'))
- ws.send(password + "\n")
-
- frame = ws.recv_frame()
- if frame.data.strip() != b"WebREPL connected\r\n>>>":
- raise Exception("Unexpected response: {}".format(frame.data))
- return ws
-
-
-def do_eval(args):
- ws = connect_and_auth(args.WEBSOCKET[0], args.password)
- ws.send("\x02")
- stdout.write(read_until_eval_or_timeout(ws))
- ws.send(args.CODE[0] + "\r\n")
-
- result = read_until_eval_or_timeout(ws)
- stdout.write(result[:-6])
- print("")
- ws.close()
-
-
-def read_until_eval_or_timeout(ws: websocket.WebSocket):
- buf = ""
- while not buf.endswith("\r\n>>> "):
- buf += ws.recv()
- return buf
-
-
-class Reader(Thread):
- def __init__(self, ws):
- Thread.__init__(self)
- self.ws = ws
- self.stop = False
-
- def run(self):
- while True:
- try:
- frame = self.ws.recv_frame()
- stdout.write(frame.data.decode('utf-8'))
- stdout.flush()
- except Exception as e:
- if self.stop:
- break
-
-
-def set_tty_raw_mode(fd):
- saved_mode = termios.tcgetattr(fd)
-
- new_mode = copy(saved_mode)
- new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO
- new_mode[tty.CC][tty.VMIN] = 1
- new_mode[tty.CC][tty.VTIME] = 0
- set_tty_mode(fd, new_mode)
-
- return saved_mode
-
-
-def set_tty_mode(fd, mode):
- termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
-
-
-def do_repl(args):
- print("Type ^[ CTRL-] or CTRL-D to quit")
- ws = connect_and_auth(args.WEBSOCKET[0], args.password)
- ws.send("\x02")
-
- reader = Reader(ws)
- reader.start()
-
- saved_tty_mode = set_tty_raw_mode(stdin.fileno())
- try:
- tty.setraw(stdin.fileno())
- while True:
- try:
- in_char = stdin.read(1)
- if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C
- break
- else:
- ws.send(in_char)
- except KeyboardInterrupt:
- break
- except Exception as _:
- pass
-
- reader.stop = True
- ws.close()
-
- set_tty_mode(stdin.fileno(), saved_tty_mode)
- print("")
-
-
-def do_put(args):
- raise NotImplementedError()
-
-
-def do_get(args):
- raise NotImplementedError()
diff --git a/mp_tool/serial.py b/mp_tool/serial.py
new file mode 100644
index 0000000..c1838f2
--- /dev/null
+++ b/mp_tool/serial.py
@@ -0,0 +1,135 @@
+"""
+Implementation of commands that run against the serial interface of micropython
+"""
+from . import Constants
+
+try:
+ import serial
+except ImportError:
+ print("Could not find pyserial library")
+ raise
+
+import os
+
+
+def eval(port: str, code: str):
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT)
+ fh.write(Constants.ENTER_REPL_MODE)
+ _, _ = fh.readline(), fh.readline()
+ print(fh.readline().decode('utf-8').strip())
+ print(fh.readline().decode('utf-8').strip())
+
+ fh.write(code.encode('utf-8') + b"\r\n")
+ fh.flush()
+
+ buf = fh.read(1)
+ i = 0
+ while not buf.endswith(b"\r\n>>> "):
+ buf += fh.read(1)
+ i += 1
+ if i > 300:
+ raise Exception("Exceed number of bytes while seeking for end of output")
+ print(buf.decode('utf-8')[:-6])
+
+
+def ls(port: str, directory: str):
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import os\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
+ fh.write(b"try:")
+ fh.write(b" print('\\n'.join(os.listdir(" + repr(directory).encode('utf-8') + b")))\r\n")
+ fh.write(b"except OSError as e:\r\n")
+ fh.write(b" print(str(e))\r\n")
+ fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(Constants.CTRL_D)
+ fh.flush()
+ fh.reset_input_buffer()
+
+ _line_ok = fh.readline()
+ if fh.readline().strip() != Constants.MARKER_BEGIN:
+ raise Exception('Failed to find begin marker')
+
+ line = fh.readline()
+ while line.strip() != Constants.MARKER_END:
+ print(line.strip().decode('utf-8'))
+ line = fh.readline()
+
+ fh.write(Constants.ENTER_REPL_MODE)
+
+
+def get(port: str, remote_filename: str, target: str):
+ if target:
+ if os.path.isdir(target):
+ local_filename = os.path.join(target, os.path.basename(remote_filename))
+ else:
+ local_filename = target
+ else:
+ local_filename = os.path.basename(remote_filename)
+
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import sys\r\n")
+ fh.write(b"import os\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
+ fh.write(b"try:\r\n")
+ fh.write(" print(os.stat({})[6])\r\n".format(repr(remote_filename)).encode('utf-8'))
+ fh.write(b"except OSError:\r\n")
+ fh.write(b" print('-1')\r\n")
+ fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
+ fh.write("with open({}, 'rb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
+ # use sys.stdout.buffer to avoid cr to crlf conversion
+ fh.write(b" sys.stdout.buffer.write(fh.read())\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(Constants.CTRL_D)
+ fh.flush()
+ fh.reset_input_buffer()
+
+ _line_ok = fh.readline()
+
+ if fh.readline().strip() != Constants.MARKER_BEGIN:
+ raise Exception('Failed to find begin marker')
+
+ length = int(fh.readline().strip().decode('utf-8'))
+ if fh.readline().strip() != Constants.MARKER_END:
+ raise Exception("Failed to read end marker value")
+
+ if length == -1:
+ raise Exception("Failed to read file {}".format(remote_filename))
+
+ print("File length: {}".format(length))
+
+ with open(local_filename, 'wb') as fh_out:
+ bytes_processed = fh_out.write(fh.read(length))
+ print("{} bytes written to {}".format(bytes_processed, local_filename))
+
+ fh.write(Constants.ENTER_REPL_MODE)
+
+
+def put(port: str, local_filename: str, target: str):
+ if target:
+ remote_filename = os.path.join(target, local_filename)
+ else:
+ remote_filename = os.path.basename(local_filename)
+
+ with open(local_filename, 'br') as file_fh:
+ data = file_fh.read()
+
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import sys\r\n")
+ fh.write("with open({}, 'wb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
+ if len(data) == 0:
+ fh.write(b" pass\r\n")
+ else:
+ fh.write(" fh.write(sys.stdin.buffer.read({}))\r\n".format(len(data)).encode('utf-8'))
+ fh.write(Constants.CTRL_D)
+ fh.write(data)
+ fh.write(Constants.ENTER_REPL_MODE)
diff --git a/mp_tool/util.py b/mp_tool/util.py
new file mode 100644
index 0000000..7e3f610
--- /dev/null
+++ b/mp_tool/util.py
@@ -0,0 +1,65 @@
+import argparse
+import platform
+
+class HelpAction(argparse._HelpAction):
+ def __call__(self, parser, namespace, values, option_string=None):
+ formatter = parser._get_formatter()
+ formatter.add_usage(parser.usage,
+ parser._actions,
+ parser._mutually_exclusive_groups)
+
+ formatter.start_section(parser._optionals.title)
+ formatter.add_text(parser._optionals.description)
+ formatter.add_arguments(parser._optionals._group_actions)
+ formatter.end_section()
+
+ subparsers_actions = [
+ action for action in parser._actions
+ if isinstance(action, argparse._SubParsersAction)]
+
+ for subparsers_action in subparsers_actions:
+ # get all subparsers and print help
+ subparsers = subparsers_action.choices
+ for subaction in subparsers_action._get_subactions():
+ subparser = subparsers[subaction.dest]
+ usage = formatter._format_actions_usage(subparser._actions, [])
+ usage_parent = formatter._format_actions_usage(filter(
+ lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)),
+ parser._actions), [])
+ formatter.start_section("{} {} {} {}".format(formatter._prog,
+ usage_parent,
+ subaction.dest,
+ usage))
+ formatter.add_text(subaction.help)
+ formatter.add_arguments(subparser._positionals._group_actions)
+ formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction),
+ subparser._optionals._group_actions))
+ formatter.end_section()
+
+ print(formatter.format_help())
+ parser.exit(0)
+
+
+def format_subcommands_help(parser: argparse.ArgumentParser):
+ subparsers_actions = [
+ action for action in parser._actions
+ if isinstance(action, argparse._SubParsersAction)]
+
+ formatter = parser._get_formatter()
+ formatter.add_usage(parser.usage,
+ parser._actions,
+ parser._mutually_exclusive_groups)
+
+ formatter.start_section("Choose subcommand")
+ for subparsers_action in subparsers_actions:
+ formatter.add_argument(subparsers_action)
+ formatter.end_section()
+
+ return formatter.format_help()
+
+
+def get_default_serial_port() -> str:
+ if platform.system() == "Windows":
+ return 'COM3'
+ else:
+ return '/dev/ttyUSB0'
diff --git a/mp_tool/web.py b/mp_tool/web.py
new file mode 100644
index 0000000..6dcccfb
--- /dev/null
+++ b/mp_tool/web.py
@@ -0,0 +1,117 @@
+"""
+Implementation of commands that run against the websocket interface of micropython
+"""
+from . import Constants
+
+import websocket
+
+import tty
+import termios
+from threading import Thread
+from sys import stdout, stdin
+from copy import copy
+
+
+def get(url: str, password: str):
+ raise NotImplementedError()
+
+
+def put(url: str, password: str):
+ raise NotImplementedError()
+
+
+def connect_and_auth(url, password) -> websocket.WebSocket:
+ ws = websocket.create_connection(url, timeout=0.5)
+ frame = ws.recv_frame()
+
+ if frame.data != b"Password: ":
+ raise Exception("Unexpected response: {}".format(frame.data))
+ stdout.write(frame.data.decode('utf-8'))
+ ws.send(password + "\n")
+
+ frame = ws.recv_frame()
+ if frame.data.strip() != b"WebREPL connected\r\n>>>":
+ raise Exception("Unexpected response: {}".format(frame.data))
+ return ws
+
+
+def eval(url: str, password: str, code: str):
+ ws = connect_and_auth(url, password)
+ ws.send(Constants.ENTER_REPL_MODE)
+ stdout.write(read_until_eval_or_timeout(ws))
+ ws.send(code + "\r\n")
+
+ result = read_until_eval_or_timeout(ws)
+ stdout.write(result[:-6])
+ print("")
+ ws.close()
+
+
+def read_until_eval_or_timeout(ws: websocket.WebSocket):
+ buf = ""
+ while not buf.endswith("\r\n>>> "):
+ buf += ws.recv()
+ return buf
+
+
+class Reader(Thread):
+ def __init__(self, ws):
+ Thread.__init__(self)
+ self.ws = ws
+ self.stop = False
+
+ def run(self):
+ while True:
+ try:
+ frame = self.ws.recv_frame()
+ stdout.write(frame.data.decode('utf-8'))
+ stdout.flush()
+ except Exception as e:
+ if self.stop:
+ break
+
+
+def set_tty_raw_mode(fd):
+ saved_mode = termios.tcgetattr(fd)
+
+ new_mode = copy(saved_mode)
+ new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO
+ new_mode[tty.CC][tty.VMIN] = 1
+ new_mode[tty.CC][tty.VTIME] = 0
+ set_tty_mode(fd, new_mode)
+
+ return saved_mode
+
+
+def set_tty_mode(fd, mode):
+ termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
+
+
+def repl(url: str, password: str):
+ print("Type ^[ CTRL-] or CTRL-D to quit")
+ ws = connect_and_auth(url, password)
+ ws.send("\x02")
+
+ reader = Reader(ws)
+ reader.start()
+
+ saved_tty_mode = set_tty_raw_mode(stdin.fileno())
+ try:
+ tty.setraw(stdin.fileno())
+ while True:
+ try:
+ in_char = stdin.read(1)
+ if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C
+ break
+ else:
+ ws.send(in_char)
+ except KeyboardInterrupt:
+ break
+ except Exception as _:
+ pass
+
+ reader.stop = True
+ ws.close()
+
+ set_tty_mode(stdin.fileno(), saved_tty_mode)
+ print("")