summaryrefslogtreecommitdiff
path: root/mp_tool/serial.py
blob: c1838f2429954d07cebcacaeab6c38b9cef95902 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Implementation of commands that run against the serial interface of micropython
"""
from . import Constants

try:
    import serial
except ImportError:
    print("Could not find pyserial library")
    raise

import os


def eval(port: str, code: str):
    with serial.Serial(port=port, baudrate=115200) as fh:
        fh.write(Constants.INTERRUPT)
        fh.write(Constants.ENTER_REPL_MODE)
        _, _ = fh.readline(), fh.readline()
        print(fh.readline().decode('utf-8').strip())
        print(fh.readline().decode('utf-8').strip())

        fh.write(code.encode('utf-8') + b"\r\n")
        fh.flush()

        buf = fh.read(1)
        i = 0
        while not buf.endswith(b"\r\n>>> "):
            buf += fh.read(1)
            i += 1
            if i > 300:
                raise Exception("Exceed number of bytes while seeking for end of output")
        print(buf.decode('utf-8')[:-6])


def ls(port: str, directory: str):
    with serial.Serial(port=port, baudrate=115200) as fh:
        fh.write(Constants.INTERRUPT)  # ctrl-c interrupt
        fh.write(Constants.ENTER_RAW_MODE)  # ctrl-a raw-mode
        fh.write(b"import os\r\n")
        fh.write(b"print()\r\n")
        fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
        fh.write(b"try:")
        fh.write(b"    print('\\n'.join(os.listdir(" + repr(directory).encode('utf-8') + b")))\r\n")
        fh.write(b"except OSError as e:\r\n")
        fh.write(b"    print(str(e))\r\n")
        fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
        fh.write(b"print()\r\n")
        fh.write(Constants.CTRL_D)
        fh.flush()
        fh.reset_input_buffer()

        _line_ok = fh.readline()
        if fh.readline().strip() != Constants.MARKER_BEGIN:
            raise Exception('Failed to find begin marker')

        line = fh.readline()
        while line.strip() != Constants.MARKER_END:
            print(line.strip().decode('utf-8'))
            line = fh.readline()

        fh.write(Constants.ENTER_REPL_MODE)


def get(port: str, remote_filename: str, target: str):
    if target:
        if os.path.isdir(target):
            local_filename = os.path.join(target, os.path.basename(remote_filename))
        else:
            local_filename = target
    else:
        local_filename = os.path.basename(remote_filename)

    with serial.Serial(port=port, baudrate=115200) as fh:
        fh.write(Constants.INTERRUPT)  # ctrl-c interrupt
        fh.write(Constants.ENTER_RAW_MODE)  # ctrl-a raw-mode
        fh.write(b"import sys\r\n")
        fh.write(b"import os\r\n")
        fh.write(b"print()\r\n")
        fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
        fh.write(b"try:\r\n")
        fh.write("   print(os.stat({})[6])\r\n".format(repr(remote_filename)).encode('utf-8'))
        fh.write(b"except OSError:\r\n")
        fh.write(b"  print('-1')\r\n")
        fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
        fh.write("with open({}, 'rb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
        # use sys.stdout.buffer to avoid cr to crlf conversion
        fh.write(b"    sys.stdout.buffer.write(fh.read())\r\n")
        fh.write(b"print()\r\n")
        fh.write(Constants.CTRL_D)
        fh.flush()
        fh.reset_input_buffer()

        _line_ok = fh.readline()

        if fh.readline().strip() != Constants.MARKER_BEGIN:
            raise Exception('Failed to find begin marker')

        length = int(fh.readline().strip().decode('utf-8'))
        if fh.readline().strip() != Constants.MARKER_END:
            raise Exception("Failed to read end marker value")

        if length == -1:
            raise Exception("Failed to read file {}".format(remote_filename))

        print("File length: {}".format(length))

        with open(local_filename, 'wb') as fh_out:
            bytes_processed = fh_out.write(fh.read(length))
            print("{} bytes written to {}".format(bytes_processed, local_filename))

        fh.write(Constants.ENTER_REPL_MODE)


def put(port: str, local_filename: str, target: str):
    if target:
        remote_filename = os.path.join(target, local_filename)
    else:
        remote_filename = os.path.basename(local_filename)

    with open(local_filename, 'br') as file_fh:
        data = file_fh.read()

    with serial.Serial(port=port, baudrate=115200) as fh:
        fh.write(Constants.INTERRUPT)  # ctrl-c interrupt
        fh.write(Constants.ENTER_RAW_MODE)  # ctrl-a raw-mode
        fh.write(b"import sys\r\n")
        fh.write("with open({}, 'wb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
        if len(data) == 0:
            fh.write(b"    pass\r\n")
        else:
            fh.write("    fh.write(sys.stdin.buffer.read({}))\r\n".format(len(data)).encode('utf-8'))
        fh.write(Constants.CTRL_D)
        fh.write(data)
        fh.write(Constants.ENTER_REPL_MODE)