Initial commit
This commit is contained in:
0
deps/serial/urlhandler/__init__.py
vendored
Normal file
0
deps/serial/urlhandler/__init__.py
vendored
Normal file
57
deps/serial/urlhandler/protocol_alt.py
vendored
Normal file
57
deps/serial/urlhandler/protocol_alt.py
vendored
Normal file
@@ -0,0 +1,57 @@
|
||||
#! python
|
||||
#
|
||||
# This module implements a special URL handler that allows selecting an
|
||||
# alternate implementation provided by some backends.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2015 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# URL format: alt://port[?option[=value][&option[=value]]]
|
||||
# options:
|
||||
# - class=X used class named X instead of Serial
|
||||
#
|
||||
# example:
|
||||
# use poll based implementation on Posix (Linux):
|
||||
# python -m serial.tools.miniterm alt:///dev/ttyUSB0?class=PosixPollSerial
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
try:
|
||||
import urlparse
|
||||
except ImportError:
|
||||
import urllib.parse as urlparse
|
||||
|
||||
import serial
|
||||
|
||||
|
||||
def serial_class_for_url(url):
|
||||
"""extract host and port from an URL string"""
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.scheme != 'alt':
|
||||
raise serial.SerialException(
|
||||
'expected a string in the form "alt://port[?option[=value][&option[=value]]]": '
|
||||
'not starting with alt:// ({!r})'.format(parts.scheme))
|
||||
class_name = 'Serial'
|
||||
try:
|
||||
for option, values in urlparse.parse_qs(parts.query, True).items():
|
||||
if option == 'class':
|
||||
class_name = values[0]
|
||||
else:
|
||||
raise ValueError('unknown option: {!r}'.format(option))
|
||||
except ValueError as e:
|
||||
raise serial.SerialException(
|
||||
'expected a string in the form '
|
||||
'"alt://port[?option[=value][&option[=value]]]": {!r}'.format(e))
|
||||
if not hasattr(serial, class_name):
|
||||
raise ValueError('unknown class: {!r}'.format(class_name))
|
||||
cls = getattr(serial, class_name)
|
||||
if not issubclass(cls, serial.Serial):
|
||||
raise ValueError('class {!r} is not an instance of Serial'.format(class_name))
|
||||
return (''.join([parts.netloc, parts.path]), cls)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
if __name__ == '__main__':
|
||||
s = serial.serial_for_url('alt:///dev/ttyS0?class=PosixPollSerial')
|
||||
print(s)
|
||||
258
deps/serial/urlhandler/protocol_cp2110.py
vendored
Normal file
258
deps/serial/urlhandler/protocol_cp2110.py
vendored
Normal file
@@ -0,0 +1,258 @@
|
||||
#! python
|
||||
#
|
||||
# Backend for Silicon Labs CP2110/4 HID-to-UART devices.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
|
||||
# (C) 2019 Google LLC
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
# This backend implements support for HID-to-UART devices manufactured
|
||||
# by Silicon Labs and marketed as CP2110 and CP2114. The
|
||||
# implementation is (mostly) OS-independent and in userland. It relies
|
||||
# on cython-hidapi (https://github.com/trezor/cython-hidapi).
|
||||
|
||||
# The HID-to-UART protocol implemented by CP2110/4 is described in the
|
||||
# AN434 document from Silicon Labs:
|
||||
# https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf
|
||||
|
||||
# TODO items:
|
||||
|
||||
# - rtscts support is configured for hardware flow control, but the
|
||||
# signaling is missing (AN434 suggests this is done through GPIO).
|
||||
# - Cancelling reads and writes is not supported.
|
||||
# - Baudrate validation is not implemented, as it depends on model and configuration.
|
||||
|
||||
import struct
|
||||
import threading
|
||||
|
||||
try:
|
||||
import urlparse
|
||||
except ImportError:
|
||||
import urllib.parse as urlparse
|
||||
|
||||
try:
|
||||
import Queue
|
||||
except ImportError:
|
||||
import queue as Queue
|
||||
|
||||
import hid # hidapi
|
||||
|
||||
import serial
|
||||
from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout
|
||||
|
||||
|
||||
# Report IDs and related constant
|
||||
_REPORT_GETSET_UART_ENABLE = 0x41
|
||||
_DISABLE_UART = 0x00
|
||||
_ENABLE_UART = 0x01
|
||||
|
||||
_REPORT_SET_PURGE_FIFOS = 0x43
|
||||
_PURGE_TX_FIFO = 0x01
|
||||
_PURGE_RX_FIFO = 0x02
|
||||
|
||||
_REPORT_GETSET_UART_CONFIG = 0x50
|
||||
|
||||
_REPORT_SET_TRANSMIT_LINE_BREAK = 0x51
|
||||
_REPORT_SET_STOP_LINE_BREAK = 0x52
|
||||
|
||||
|
||||
class Serial(SerialBase):
|
||||
# This is not quite correct. AN343 specifies that the minimum
|
||||
# baudrate is different between CP2110 and CP2114, and it's halved
|
||||
# when using non-8-bit symbols.
|
||||
BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200,
|
||||
38400, 57600, 115200, 230400, 460800, 500000, 576000,
|
||||
921600, 1000000)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._hid_handle = None
|
||||
self._read_buffer = None
|
||||
self._thread = None
|
||||
super(Serial, self).__init__(*args, **kwargs)
|
||||
|
||||
def open(self):
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
if self.is_open:
|
||||
raise SerialException("Port is already open.")
|
||||
|
||||
self._read_buffer = Queue.Queue()
|
||||
|
||||
self._hid_handle = hid.device()
|
||||
try:
|
||||
portpath = self.from_url(self.portstr)
|
||||
self._hid_handle.open_path(portpath)
|
||||
except OSError as msg:
|
||||
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
|
||||
|
||||
try:
|
||||
self._reconfigure_port()
|
||||
except:
|
||||
try:
|
||||
self._hid_handle.close()
|
||||
except:
|
||||
pass
|
||||
self._hid_handle = None
|
||||
raise
|
||||
else:
|
||||
self.is_open = True
|
||||
self._thread = threading.Thread(target=self._hid_read_loop)
|
||||
self._thread.setDaemon(True)
|
||||
self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port))
|
||||
self._thread.start()
|
||||
|
||||
def from_url(self, url):
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.scheme != "cp2110":
|
||||
raise SerialException(
|
||||
'expected a string in the forms '
|
||||
'"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": '
|
||||
'not starting with cp2110:// {{!r}}'.format(parts.scheme))
|
||||
if parts.netloc: # cp2100://BUS:DEVICE:ENDPOINT, for libusb
|
||||
return parts.netloc.encode('utf-8')
|
||||
return parts.path.encode('utf-8')
|
||||
|
||||
def close(self):
|
||||
self.is_open = False
|
||||
if self._thread:
|
||||
self._thread.join(1) # read timeout is 0.1
|
||||
self._thread = None
|
||||
self._hid_handle.close()
|
||||
self._hid_handle = None
|
||||
|
||||
def _reconfigure_port(self):
|
||||
parity_value = None
|
||||
if self._parity == serial.PARITY_NONE:
|
||||
parity_value = 0x00
|
||||
elif self._parity == serial.PARITY_ODD:
|
||||
parity_value = 0x01
|
||||
elif self._parity == serial.PARITY_EVEN:
|
||||
parity_value = 0x02
|
||||
elif self._parity == serial.PARITY_MARK:
|
||||
parity_value = 0x03
|
||||
elif self._parity == serial.PARITY_SPACE:
|
||||
parity_value = 0x04
|
||||
else:
|
||||
raise ValueError('Invalid parity: {!r}'.format(self._parity))
|
||||
|
||||
if self.rtscts:
|
||||
flow_control_value = 0x01
|
||||
else:
|
||||
flow_control_value = 0x00
|
||||
|
||||
data_bits_value = None
|
||||
if self._bytesize == 5:
|
||||
data_bits_value = 0x00
|
||||
elif self._bytesize == 6:
|
||||
data_bits_value = 0x01
|
||||
elif self._bytesize == 7:
|
||||
data_bits_value = 0x02
|
||||
elif self._bytesize == 8:
|
||||
data_bits_value = 0x03
|
||||
else:
|
||||
raise ValueError('Invalid char len: {!r}'.format(self._bytesize))
|
||||
|
||||
stop_bits_value = None
|
||||
if self._stopbits == serial.STOPBITS_ONE:
|
||||
stop_bits_value = 0x00
|
||||
elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
|
||||
stop_bits_value = 0x01
|
||||
elif self._stopbits == serial.STOPBITS_TWO:
|
||||
stop_bits_value = 0x01
|
||||
else:
|
||||
raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))
|
||||
|
||||
configuration_report = struct.pack(
|
||||
'>BLBBBB',
|
||||
_REPORT_GETSET_UART_CONFIG,
|
||||
self._baudrate,
|
||||
parity_value,
|
||||
flow_control_value,
|
||||
data_bits_value,
|
||||
stop_bits_value)
|
||||
|
||||
self._hid_handle.send_feature_report(configuration_report)
|
||||
|
||||
self._hid_handle.send_feature_report(
|
||||
bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART)))
|
||||
self._update_break_state()
|
||||
|
||||
@property
|
||||
def in_waiting(self):
|
||||
return self._read_buffer.qsize()
|
||||
|
||||
def reset_input_buffer(self):
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
self._hid_handle.send_feature_report(
|
||||
bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO)))
|
||||
# empty read buffer
|
||||
while self._read_buffer.qsize():
|
||||
self._read_buffer.get(False)
|
||||
|
||||
def reset_output_buffer(self):
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
self._hid_handle.send_feature_report(
|
||||
bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO)))
|
||||
|
||||
def _update_break_state(self):
|
||||
if not self._hid_handle:
|
||||
raise PortNotOpenError()
|
||||
|
||||
if self._break_state:
|
||||
self._hid_handle.send_feature_report(
|
||||
bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0)))
|
||||
else:
|
||||
# Note that while AN434 states "There are no data bytes in
|
||||
# the payload other than the Report ID", either hidapi or
|
||||
# Linux does not seem to send the report otherwise.
|
||||
self._hid_handle.send_feature_report(
|
||||
bytes((_REPORT_SET_STOP_LINE_BREAK, 0)))
|
||||
|
||||
def read(self, size=1):
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
|
||||
data = bytearray()
|
||||
try:
|
||||
timeout = Timeout(self._timeout)
|
||||
while len(data) < size:
|
||||
if self._thread is None:
|
||||
raise SerialException('connection failed (reader thread died)')
|
||||
buf = self._read_buffer.get(True, timeout.time_left())
|
||||
if buf is None:
|
||||
return bytes(data)
|
||||
data += buf
|
||||
if timeout.expired():
|
||||
break
|
||||
except Queue.Empty: # -> timeout
|
||||
pass
|
||||
return bytes(data)
|
||||
|
||||
def write(self, data):
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
data = to_bytes(data)
|
||||
tx_len = len(data)
|
||||
while tx_len > 0:
|
||||
to_be_sent = min(tx_len, 0x3F)
|
||||
report = to_bytes([to_be_sent]) + data[:to_be_sent]
|
||||
self._hid_handle.write(report)
|
||||
|
||||
data = data[to_be_sent:]
|
||||
tx_len = len(data)
|
||||
|
||||
def _hid_read_loop(self):
|
||||
try:
|
||||
while self.is_open:
|
||||
data = self._hid_handle.read(64, timeout_ms=100)
|
||||
if not data:
|
||||
continue
|
||||
data_len = data.pop(0)
|
||||
assert data_len == len(data)
|
||||
self._read_buffer.put(bytearray(data))
|
||||
finally:
|
||||
self._thread = None
|
||||
91
deps/serial/urlhandler/protocol_hwgrep.py
vendored
Normal file
91
deps/serial/urlhandler/protocol_hwgrep.py
vendored
Normal file
@@ -0,0 +1,91 @@
|
||||
#! python
|
||||
#
|
||||
# This module implements a special URL handler that uses the port listing to
|
||||
# find ports by searching the string descriptions.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# URL format: hwgrep://<regexp>&<option>
|
||||
#
|
||||
# where <regexp> is a Python regexp according to the re module
|
||||
#
|
||||
# violating the normal definition for URLs, the charachter `&` is used to
|
||||
# separate parameters from the arguments (instead of `?`, but the question mark
|
||||
# is heavily used in regexp'es)
|
||||
#
|
||||
# options:
|
||||
# n=<N> pick the N'th entry instead of the first one (numbering starts at 1)
|
||||
# skip_busy tries to open port to check if it is busy, fails on posix as ports are not locked!
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
|
||||
try:
|
||||
basestring
|
||||
except NameError:
|
||||
basestring = str # python 3 pylint: disable=redefined-builtin
|
||||
|
||||
|
||||
class Serial(serial.Serial):
|
||||
"""Just inherit the native Serial port implementation and patch the port property."""
|
||||
# pylint: disable=no-member
|
||||
|
||||
@serial.Serial.port.setter
|
||||
def port(self, value):
|
||||
"""translate port name before storing it"""
|
||||
if isinstance(value, basestring) and value.startswith('hwgrep://'):
|
||||
serial.Serial.port.__set__(self, self.from_url(value))
|
||||
else:
|
||||
serial.Serial.port.__set__(self, value)
|
||||
|
||||
def from_url(self, url):
|
||||
"""extract host and port from an URL string"""
|
||||
if url.lower().startswith("hwgrep://"):
|
||||
url = url[9:]
|
||||
n = 0
|
||||
test_open = False
|
||||
args = url.split('&')
|
||||
regexp = args.pop(0)
|
||||
for arg in args:
|
||||
if '=' in arg:
|
||||
option, value = arg.split('=', 1)
|
||||
else:
|
||||
option = arg
|
||||
value = None
|
||||
if option == 'n':
|
||||
# pick n'th element
|
||||
n = int(value) - 1
|
||||
if n < 1:
|
||||
raise ValueError('option "n" expects a positive integer larger than 1: {!r}'.format(value))
|
||||
elif option == 'skip_busy':
|
||||
# open to test if port is available. not the nicest way..
|
||||
test_open = True
|
||||
else:
|
||||
raise ValueError('unknown option: {!r}'.format(option))
|
||||
# use a for loop to get the 1st element from the generator
|
||||
for port, desc, hwid in sorted(serial.tools.list_ports.grep(regexp)):
|
||||
if test_open:
|
||||
try:
|
||||
s = serial.Serial(port)
|
||||
except serial.SerialException:
|
||||
# it has some error, skip this one
|
||||
continue
|
||||
else:
|
||||
s.close()
|
||||
if n:
|
||||
n -= 1
|
||||
continue
|
||||
return port
|
||||
else:
|
||||
raise serial.SerialException('no ports found matching regexp {!r}'.format(url))
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
if __name__ == '__main__':
|
||||
s = Serial(None)
|
||||
s.port = 'hwgrep://ttyS0'
|
||||
print(s)
|
||||
308
deps/serial/urlhandler/protocol_loop.py
vendored
Normal file
308
deps/serial/urlhandler/protocol_loop.py
vendored
Normal file
@@ -0,0 +1,308 @@
|
||||
#! python
|
||||
#
|
||||
# This module implements a loop back connection receiving itself what it sent.
|
||||
#
|
||||
# The purpose of this module is.. well... You can run the unit tests with it.
|
||||
# and it was so easy to implement ;-)
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# URL format: loop://[option[/option...]]
|
||||
# options:
|
||||
# - "debug" print diagnostic messages
|
||||
from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import numbers
|
||||
import time
|
||||
try:
|
||||
import urlparse
|
||||
except ImportError:
|
||||
import urllib.parse as urlparse
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import Queue as queue
|
||||
|
||||
from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError
|
||||
|
||||
# map log level names to constants. used in from_url()
|
||||
LOGGER_LEVELS = {
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
}
|
||||
|
||||
|
||||
class Serial(SerialBase):
|
||||
"""Serial port implementation that simulates a loop back connection in plain software."""
|
||||
|
||||
BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
|
||||
9600, 19200, 38400, 57600, 115200)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.buffer_size = 4096
|
||||
self.queue = None
|
||||
self.logger = None
|
||||
self._cancel_write = False
|
||||
super(Serial, self).__init__(*args, **kwargs)
|
||||
|
||||
def open(self):
|
||||
"""\
|
||||
Open port with current settings. This may throw a SerialException
|
||||
if the port cannot be opened.
|
||||
"""
|
||||
if self.is_open:
|
||||
raise SerialException("Port is already open.")
|
||||
self.logger = None
|
||||
self.queue = queue.Queue(self.buffer_size)
|
||||
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
# not that there is anything to open, but the function applies the
|
||||
# options found in the URL
|
||||
self.from_url(self.port)
|
||||
|
||||
# not that there anything to configure...
|
||||
self._reconfigure_port()
|
||||
# all things set up get, now a clean start
|
||||
self.is_open = True
|
||||
if not self._dsrdtr:
|
||||
self._update_dtr_state()
|
||||
if not self._rtscts:
|
||||
self._update_rts_state()
|
||||
self.reset_input_buffer()
|
||||
self.reset_output_buffer()
|
||||
|
||||
def close(self):
|
||||
if self.is_open:
|
||||
self.is_open = False
|
||||
try:
|
||||
self.queue.put_nowait(None)
|
||||
except queue.Full:
|
||||
pass
|
||||
super(Serial, self).close()
|
||||
|
||||
def _reconfigure_port(self):
|
||||
"""\
|
||||
Set communication parameters on opened port. For the loop://
|
||||
protocol all settings are ignored!
|
||||
"""
|
||||
# not that's it of any real use, but it helps in the unit tests
|
||||
if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32:
|
||||
raise ValueError("invalid baudrate: {!r}".format(self._baudrate))
|
||||
if self.logger:
|
||||
self.logger.info('_reconfigure_port()')
|
||||
|
||||
def from_url(self, url):
|
||||
"""extract host and port from an URL string"""
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.scheme != "loop":
|
||||
raise SerialException(
|
||||
'expected a string in the form '
|
||||
'"loop://[?logging={debug|info|warning|error}]": not starting '
|
||||
'with loop:// ({!r})'.format(parts.scheme))
|
||||
try:
|
||||
# process options now, directly altering self
|
||||
for option, values in urlparse.parse_qs(parts.query, True).items():
|
||||
if option == 'logging':
|
||||
logging.basicConfig() # XXX is that good to call it here?
|
||||
self.logger = logging.getLogger('pySerial.loop')
|
||||
self.logger.setLevel(LOGGER_LEVELS[values[0]])
|
||||
self.logger.debug('enabled logging')
|
||||
else:
|
||||
raise ValueError('unknown option: {!r}'.format(option))
|
||||
except ValueError as e:
|
||||
raise SerialException(
|
||||
'expected a string in the form '
|
||||
'"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
@property
|
||||
def in_waiting(self):
|
||||
"""Return the number of bytes currently in the input buffer."""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
# attention the logged value can differ from return value in
|
||||
# threaded environments...
|
||||
self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize()))
|
||||
return self.queue.qsize()
|
||||
|
||||
def read(self, size=1):
|
||||
"""\
|
||||
Read size bytes from the serial port. If a timeout is set it may
|
||||
return less characters as requested. With no timeout it will block
|
||||
until the requested number of bytes is read.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self._timeout is not None and self._timeout != 0:
|
||||
timeout = time.time() + self._timeout
|
||||
else:
|
||||
timeout = None
|
||||
data = bytearray()
|
||||
while size > 0 and self.is_open:
|
||||
try:
|
||||
b = self.queue.get(timeout=self._timeout) # XXX inter char timeout
|
||||
except queue.Empty:
|
||||
if self._timeout == 0:
|
||||
break
|
||||
else:
|
||||
if b is not None:
|
||||
data += b
|
||||
size -= 1
|
||||
else:
|
||||
break
|
||||
# check for timeout now, after data has been read.
|
||||
# useful for timeout = 0 (non blocking) read
|
||||
if timeout and time.time() > timeout:
|
||||
if self.logger:
|
||||
self.logger.info('read timeout')
|
||||
break
|
||||
return bytes(data)
|
||||
|
||||
def cancel_read(self):
|
||||
self.queue.put_nowait(None)
|
||||
|
||||
def cancel_write(self):
|
||||
self._cancel_write = True
|
||||
|
||||
def write(self, data):
|
||||
"""\
|
||||
Output the given byte string over the serial port. Can block if the
|
||||
connection is blocked. May raise SerialException if the connection is
|
||||
closed.
|
||||
"""
|
||||
self._cancel_write = False
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
data = to_bytes(data)
|
||||
# calculate aprox time that would be used to send the data
|
||||
time_used_to_send = 10.0 * len(data) / self._baudrate
|
||||
# when a write timeout is configured check if we would be successful
|
||||
# (not sending anything, not even the part that would have time)
|
||||
if self._write_timeout is not None and time_used_to_send > self._write_timeout:
|
||||
# must wait so that unit test succeeds
|
||||
time_left = self._write_timeout
|
||||
while time_left > 0 and not self._cancel_write:
|
||||
time.sleep(min(time_left, 0.5))
|
||||
time_left -= 0.5
|
||||
if self._cancel_write:
|
||||
return 0 # XXX
|
||||
raise SerialTimeoutException('Write timeout')
|
||||
for byte in iterbytes(data):
|
||||
self.queue.put(byte, timeout=self._write_timeout)
|
||||
return len(data)
|
||||
|
||||
def reset_input_buffer(self):
|
||||
"""Clear input buffer, discarding all that is in the buffer."""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('reset_input_buffer()')
|
||||
try:
|
||||
while self.queue.qsize():
|
||||
self.queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def reset_output_buffer(self):
|
||||
"""\
|
||||
Clear output buffer, aborting the current output and
|
||||
discarding all that is in the buffer.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('reset_output_buffer()')
|
||||
try:
|
||||
while self.queue.qsize():
|
||||
self.queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
@property
|
||||
def out_waiting(self):
|
||||
"""Return how many bytes the in the outgoing buffer"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
# attention the logged value can differ from return value in
|
||||
# threaded environments...
|
||||
self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize()))
|
||||
return self.queue.qsize()
|
||||
|
||||
def _update_break_state(self):
|
||||
"""\
|
||||
Set break: Controls TXD. When active, to transmitting is
|
||||
possible.
|
||||
"""
|
||||
if self.logger:
|
||||
self.logger.info('_update_break_state({!r})'.format(self._break_state))
|
||||
|
||||
def _update_rts_state(self):
|
||||
"""Set terminal status line: Request To Send"""
|
||||
if self.logger:
|
||||
self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state))
|
||||
|
||||
def _update_dtr_state(self):
|
||||
"""Set terminal status line: Data Terminal Ready"""
|
||||
if self.logger:
|
||||
self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state))
|
||||
|
||||
@property
|
||||
def cts(self):
|
||||
"""Read terminal status line: Clear To Send"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state))
|
||||
return self._rts_state
|
||||
|
||||
@property
|
||||
def dsr(self):
|
||||
"""Read terminal status line: Data Set Ready"""
|
||||
if self.logger:
|
||||
self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state))
|
||||
return self._dtr_state
|
||||
|
||||
@property
|
||||
def ri(self):
|
||||
"""Read terminal status line: Ring Indicator"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for RI')
|
||||
return False
|
||||
|
||||
@property
|
||||
def cd(self):
|
||||
"""Read terminal status line: Carrier Detect"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for CD')
|
||||
return True
|
||||
|
||||
# - - - platform specific - - -
|
||||
# None so far
|
||||
|
||||
|
||||
# simple client test
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
s = Serial('loop://')
|
||||
sys.stdout.write('{}\n'.format(s))
|
||||
|
||||
sys.stdout.write("write...\n")
|
||||
s.write("hello\n")
|
||||
s.flush()
|
||||
sys.stdout.write("read: {!r}\n".format(s.read(5)))
|
||||
|
||||
s.close()
|
||||
12
deps/serial/urlhandler/protocol_rfc2217.py
vendored
Normal file
12
deps/serial/urlhandler/protocol_rfc2217.py
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
#! python
|
||||
#
|
||||
# This is a thin wrapper to load the rfc2217 implementation.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2011 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from serial.rfc2217 import Serial # noqa
|
||||
359
deps/serial/urlhandler/protocol_socket.py
vendored
Normal file
359
deps/serial/urlhandler/protocol_socket.py
vendored
Normal file
@@ -0,0 +1,359 @@
|
||||
#! python
|
||||
#
|
||||
# This module implements a simple socket based client.
|
||||
# It does not support changing any port parameters and will silently ignore any
|
||||
# requests to do so.
|
||||
#
|
||||
# The purpose of this module is that applications using pySerial can connect to
|
||||
# TCP/IP to serial port converters that do not support RFC 2217.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# URL format: socket://<host>:<port>[/option[/option...]]
|
||||
# options:
|
||||
# - "debug" print diagnostic messages
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import select
|
||||
import socket
|
||||
import time
|
||||
try:
|
||||
import urlparse
|
||||
except ImportError:
|
||||
import urllib.parse as urlparse
|
||||
|
||||
from serial.serialutil import SerialBase, SerialException, to_bytes, \
|
||||
PortNotOpenError, SerialTimeoutException, Timeout
|
||||
|
||||
# map log level names to constants. used in from_url()
|
||||
LOGGER_LEVELS = {
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
}
|
||||
|
||||
POLL_TIMEOUT = 5
|
||||
|
||||
|
||||
class Serial(SerialBase):
|
||||
"""Serial port implementation for plain sockets."""
|
||||
|
||||
BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
|
||||
9600, 19200, 38400, 57600, 115200)
|
||||
|
||||
def open(self):
|
||||
"""\
|
||||
Open port with current settings. This may throw a SerialException
|
||||
if the port cannot be opened.
|
||||
"""
|
||||
self.logger = None
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
if self.is_open:
|
||||
raise SerialException("Port is already open.")
|
||||
try:
|
||||
# timeout is used for write timeout support :/ and to get an initial connection timeout
|
||||
self._socket = socket.create_connection(self.from_url(self.portstr), timeout=POLL_TIMEOUT)
|
||||
except Exception as msg:
|
||||
self._socket = None
|
||||
raise SerialException("Could not open port {}: {}".format(self.portstr, msg))
|
||||
# after connecting, switch to non-blocking, we're using select
|
||||
self._socket.setblocking(False)
|
||||
|
||||
# not that there is anything to configure...
|
||||
self._reconfigure_port()
|
||||
# all things set up get, now a clean start
|
||||
self.is_open = True
|
||||
if not self._dsrdtr:
|
||||
self._update_dtr_state()
|
||||
if not self._rtscts:
|
||||
self._update_rts_state()
|
||||
self.reset_input_buffer()
|
||||
self.reset_output_buffer()
|
||||
|
||||
def _reconfigure_port(self):
|
||||
"""\
|
||||
Set communication parameters on opened port. For the socket://
|
||||
protocol all settings are ignored!
|
||||
"""
|
||||
if self._socket is None:
|
||||
raise SerialException("Can only operate on open ports")
|
||||
if self.logger:
|
||||
self.logger.info('ignored port configuration change')
|
||||
|
||||
def close(self):
|
||||
"""Close port"""
|
||||
if self.is_open:
|
||||
if self._socket:
|
||||
try:
|
||||
self._socket.shutdown(socket.SHUT_RDWR)
|
||||
self._socket.close()
|
||||
except:
|
||||
# ignore errors.
|
||||
pass
|
||||
self._socket = None
|
||||
self.is_open = False
|
||||
# in case of quick reconnects, give the server some time
|
||||
time.sleep(0.3)
|
||||
|
||||
def from_url(self, url):
|
||||
"""extract host and port from an URL string"""
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.scheme != "socket":
|
||||
raise SerialException(
|
||||
'expected a string in the form '
|
||||
'"socket://<host>:<port>[?logging={debug|info|warning|error}]": '
|
||||
'not starting with socket:// ({!r})'.format(parts.scheme))
|
||||
try:
|
||||
# process options now, directly altering self
|
||||
for option, values in urlparse.parse_qs(parts.query, True).items():
|
||||
if option == 'logging':
|
||||
logging.basicConfig() # XXX is that good to call it here?
|
||||
self.logger = logging.getLogger('pySerial.socket')
|
||||
self.logger.setLevel(LOGGER_LEVELS[values[0]])
|
||||
self.logger.debug('enabled logging')
|
||||
else:
|
||||
raise ValueError('unknown option: {!r}'.format(option))
|
||||
if not 0 <= parts.port < 65536:
|
||||
raise ValueError("port not in range 0...65535")
|
||||
except ValueError as e:
|
||||
raise SerialException(
|
||||
'expected a string in the form '
|
||||
'"socket://<host>:<port>[?logging={debug|info|warning|error}]": {}'.format(e))
|
||||
|
||||
return (parts.hostname, parts.port)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
@property
|
||||
def in_waiting(self):
|
||||
"""Return the number of bytes currently in the input buffer."""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
# Poll the socket to see if it is ready for reading.
|
||||
# If ready, at least one byte will be to read.
|
||||
lr, lw, lx = select.select([self._socket], [], [], 0)
|
||||
return len(lr)
|
||||
|
||||
# select based implementation, similar to posix, but only using socket API
|
||||
# to be portable, additionally handle socket timeout which is used to
|
||||
# emulate write timeouts
|
||||
def read(self, size=1):
|
||||
"""\
|
||||
Read size bytes from the serial port. If a timeout is set it may
|
||||
return less characters as requested. With no timeout it will block
|
||||
until the requested number of bytes is read.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
read = bytearray()
|
||||
timeout = Timeout(self._timeout)
|
||||
while len(read) < size:
|
||||
try:
|
||||
ready, _, _ = select.select([self._socket], [], [], timeout.time_left())
|
||||
# If select was used with a timeout, and the timeout occurs, it
|
||||
# returns with empty lists -> thus abort read operation.
|
||||
# For timeout == 0 (non-blocking operation) also abort when
|
||||
# there is nothing to read.
|
||||
if not ready:
|
||||
break # timeout
|
||||
buf = self._socket.recv(size - len(read))
|
||||
# read should always return some data as select reported it was
|
||||
# ready to read when we get to this point, unless it is EOF
|
||||
if not buf:
|
||||
raise SerialException('socket disconnected')
|
||||
read.extend(buf)
|
||||
except OSError as e:
|
||||
# this is for Python 3.x where select.error is a subclass of
|
||||
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
|
||||
# https://www.python.org/dev/peps/pep-0475.
|
||||
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('read failed: {}'.format(e))
|
||||
except (select.error, socket.error) as e:
|
||||
# this is for Python 2.x
|
||||
# ignore BlockingIOErrors and EINTR. all errors are shown
|
||||
# see also http://www.python.org/dev/peps/pep-3151/#select
|
||||
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('read failed: {}'.format(e))
|
||||
if timeout.expired():
|
||||
break
|
||||
return bytes(read)
|
||||
|
||||
def write(self, data):
|
||||
"""\
|
||||
Output the given byte string over the serial port. Can block if the
|
||||
connection is blocked. May raise SerialException if the connection is
|
||||
closed.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
|
||||
d = to_bytes(data)
|
||||
tx_len = length = len(d)
|
||||
timeout = Timeout(self._write_timeout)
|
||||
while tx_len > 0:
|
||||
try:
|
||||
n = self._socket.send(d)
|
||||
if timeout.is_non_blocking:
|
||||
# Zero timeout indicates non-blocking - simply return the
|
||||
# number of bytes of data actually written
|
||||
return n
|
||||
elif not timeout.is_infinite:
|
||||
# when timeout is set, use select to wait for being ready
|
||||
# with the time left as timeout
|
||||
if timeout.expired():
|
||||
raise SerialTimeoutException('Write timeout')
|
||||
_, ready, _ = select.select([], [self._socket], [], timeout.time_left())
|
||||
if not ready:
|
||||
raise SerialTimeoutException('Write timeout')
|
||||
else:
|
||||
assert timeout.time_left() is None
|
||||
# wait for write operation
|
||||
_, ready, _ = select.select([], [self._socket], [], None)
|
||||
if not ready:
|
||||
raise SerialException('write failed (select)')
|
||||
d = d[n:]
|
||||
tx_len -= n
|
||||
except SerialException:
|
||||
raise
|
||||
except OSError as e:
|
||||
# this is for Python 3.x where select.error is a subclass of
|
||||
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
|
||||
# https://www.python.org/dev/peps/pep-0475.
|
||||
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('write failed: {}'.format(e))
|
||||
except select.error as e:
|
||||
# this is for Python 2.x
|
||||
# ignore BlockingIOErrors and EINTR. all errors are shown
|
||||
# see also http://www.python.org/dev/peps/pep-3151/#select
|
||||
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('write failed: {}'.format(e))
|
||||
if not timeout.is_non_blocking and timeout.expired():
|
||||
raise SerialTimeoutException('Write timeout')
|
||||
return length - len(d)
|
||||
|
||||
def reset_input_buffer(self):
|
||||
"""Clear input buffer, discarding all that is in the buffer."""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
|
||||
# just use recv to remove input, while there is some
|
||||
ready = True
|
||||
while ready:
|
||||
ready, _, _ = select.select([self._socket], [], [], 0)
|
||||
try:
|
||||
if ready:
|
||||
ready = self._socket.recv(4096)
|
||||
except OSError as e:
|
||||
# this is for Python 3.x where select.error is a subclass of
|
||||
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
|
||||
# https://www.python.org/dev/peps/pep-0475.
|
||||
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('read failed: {}'.format(e))
|
||||
except (select.error, socket.error) as e:
|
||||
# this is for Python 2.x
|
||||
# ignore BlockingIOErrors and EINTR. all errors are shown
|
||||
# see also http://www.python.org/dev/peps/pep-3151/#select
|
||||
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
|
||||
raise SerialException('read failed: {}'.format(e))
|
||||
|
||||
def reset_output_buffer(self):
|
||||
"""\
|
||||
Clear output buffer, aborting the current output and
|
||||
discarding all that is in the buffer.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('ignored reset_output_buffer')
|
||||
|
||||
def send_break(self, duration=0.25):
|
||||
"""\
|
||||
Send break condition. Timed, returns to idle state after given
|
||||
duration.
|
||||
"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('ignored send_break({!r})'.format(duration))
|
||||
|
||||
def _update_break_state(self):
|
||||
"""Set break: Controls TXD. When active, to transmitting is
|
||||
possible."""
|
||||
if self.logger:
|
||||
self.logger.info('ignored _update_break_state({!r})'.format(self._break_state))
|
||||
|
||||
def _update_rts_state(self):
|
||||
"""Set terminal status line: Request To Send"""
|
||||
if self.logger:
|
||||
self.logger.info('ignored _update_rts_state({!r})'.format(self._rts_state))
|
||||
|
||||
def _update_dtr_state(self):
|
||||
"""Set terminal status line: Data Terminal Ready"""
|
||||
if self.logger:
|
||||
self.logger.info('ignored _update_dtr_state({!r})'.format(self._dtr_state))
|
||||
|
||||
@property
|
||||
def cts(self):
|
||||
"""Read terminal status line: Clear To Send"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for cts')
|
||||
return True
|
||||
|
||||
@property
|
||||
def dsr(self):
|
||||
"""Read terminal status line: Data Set Ready"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for dsr')
|
||||
return True
|
||||
|
||||
@property
|
||||
def ri(self):
|
||||
"""Read terminal status line: Ring Indicator"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for ri')
|
||||
return False
|
||||
|
||||
@property
|
||||
def cd(self):
|
||||
"""Read terminal status line: Carrier Detect"""
|
||||
if not self.is_open:
|
||||
raise PortNotOpenError()
|
||||
if self.logger:
|
||||
self.logger.info('returning dummy for cd)')
|
||||
return True
|
||||
|
||||
# - - - platform specific - - -
|
||||
|
||||
# works on Linux and probably all the other POSIX systems
|
||||
def fileno(self):
|
||||
"""Get the file handle of the underlying socket for use with select"""
|
||||
return self._socket.fileno()
|
||||
|
||||
|
||||
#
|
||||
# simple client test
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
s = Serial('socket://localhost:7000')
|
||||
sys.stdout.write('{}\n'.format(s))
|
||||
|
||||
sys.stdout.write("write...\n")
|
||||
s.write(b"hello\n")
|
||||
s.flush()
|
||||
sys.stdout.write("read: {}\n".format(s.read(5)))
|
||||
|
||||
s.close()
|
||||
290
deps/serial/urlhandler/protocol_spy.py
vendored
Normal file
290
deps/serial/urlhandler/protocol_spy.py
vendored
Normal file
@@ -0,0 +1,290 @@
|
||||
#! python
|
||||
#
|
||||
# This module implements a special URL handler that wraps an other port,
|
||||
# print the traffic for debugging purposes. With this, it is possible
|
||||
# to debug the serial port traffic on every application that uses
|
||||
# serial_for_url.
|
||||
#
|
||||
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
||||
# (C) 2015 Chris Liechti <cliechti@gmx.net>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# URL format: spy://port[?option[=value][&option[=value]]]
|
||||
# options:
|
||||
# - dev=X a file or device to write to
|
||||
# - color use escape code to colorize output
|
||||
# - raw forward raw bytes instead of hexdump
|
||||
#
|
||||
# example:
|
||||
# redirect output to an other terminal window on Posix (Linux):
|
||||
# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
import serial
|
||||
from serial.serialutil import to_bytes
|
||||
|
||||
try:
|
||||
import urlparse
|
||||
except ImportError:
|
||||
import urllib.parse as urlparse
|
||||
|
||||
|
||||
def sixteen(data):
|
||||
"""\
|
||||
yield tuples of hex and ASCII display in multiples of 16. Includes a
|
||||
space after 8 bytes and (None, None) after 16 bytes and at the end.
|
||||
"""
|
||||
n = 0
|
||||
for b in serial.iterbytes(data):
|
||||
yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
|
||||
n += 1
|
||||
if n == 8:
|
||||
yield (' ', '')
|
||||
elif n >= 16:
|
||||
yield (None, None)
|
||||
n = 0
|
||||
if n > 0:
|
||||
while n < 16:
|
||||
n += 1
|
||||
if n == 8:
|
||||
yield (' ', '')
|
||||
yield (' ', ' ')
|
||||
yield (None, None)
|
||||
|
||||
|
||||
def hexdump(data):
|
||||
"""yield lines with hexdump of data"""
|
||||
values = []
|
||||
ascii = []
|
||||
offset = 0
|
||||
for h, a in sixteen(data):
|
||||
if h is None:
|
||||
yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
|
||||
del values[:]
|
||||
del ascii[:]
|
||||
offset += 0x10
|
||||
else:
|
||||
values.append(h)
|
||||
ascii.append(a)
|
||||
|
||||
|
||||
class FormatRaw(object):
|
||||
"""Forward only RX and TX data to output."""
|
||||
|
||||
def __init__(self, output, color):
|
||||
self.output = output
|
||||
self.color = color
|
||||
self.rx_color = '\x1b[32m'
|
||||
self.tx_color = '\x1b[31m'
|
||||
|
||||
def rx(self, data):
|
||||
"""show received data"""
|
||||
if self.color:
|
||||
self.output.write(self.rx_color)
|
||||
self.output.write(data)
|
||||
self.output.flush()
|
||||
|
||||
def tx(self, data):
|
||||
"""show transmitted data"""
|
||||
if self.color:
|
||||
self.output.write(self.tx_color)
|
||||
self.output.write(data)
|
||||
self.output.flush()
|
||||
|
||||
def control(self, name, value):
|
||||
"""(do not) show control calls"""
|
||||
pass
|
||||
|
||||
|
||||
class FormatHexdump(object):
|
||||
"""\
|
||||
Create a hex dump of RX ad TX data, show when control lines are read or
|
||||
written.
|
||||
|
||||
output example::
|
||||
|
||||
000000.000 Q-RX flushInput
|
||||
000002.469 RTS inactive
|
||||
000002.773 RTS active
|
||||
000003.001 TX 48 45 4C 4C 4F HELLO
|
||||
000003.102 RX 48 45 4C 4C 4F HELLO
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, output, color):
|
||||
self.start_time = time.time()
|
||||
self.output = output
|
||||
self.color = color
|
||||
self.rx_color = '\x1b[32m'
|
||||
self.tx_color = '\x1b[31m'
|
||||
self.control_color = '\x1b[37m'
|
||||
|
||||
def write_line(self, timestamp, label, value, value2=''):
|
||||
self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
|
||||
self.output.flush()
|
||||
|
||||
def rx(self, data):
|
||||
"""show received data as hex dump"""
|
||||
if self.color:
|
||||
self.output.write(self.rx_color)
|
||||
if data:
|
||||
for offset, row in hexdump(data):
|
||||
self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row)
|
||||
else:
|
||||
self.write_line(time.time() - self.start_time, 'RX', '<empty>')
|
||||
|
||||
def tx(self, data):
|
||||
"""show transmitted data as hex dump"""
|
||||
if self.color:
|
||||
self.output.write(self.tx_color)
|
||||
for offset, row in hexdump(data):
|
||||
self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row)
|
||||
|
||||
def control(self, name, value):
|
||||
"""show control calls"""
|
||||
if self.color:
|
||||
self.output.write(self.control_color)
|
||||
self.write_line(time.time() - self.start_time, name, value)
|
||||
|
||||
|
||||
class Serial(serial.Serial):
|
||||
"""\
|
||||
Inherit the native Serial port implementation and wrap all the methods and
|
||||
attributes.
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Serial, self).__init__(*args, **kwargs)
|
||||
self.formatter = None
|
||||
self.show_all = False
|
||||
|
||||
@serial.Serial.port.setter
|
||||
def port(self, value):
|
||||
if value is not None:
|
||||
serial.Serial.port.__set__(self, self.from_url(value))
|
||||
|
||||
def from_url(self, url):
|
||||
"""extract host and port from an URL string"""
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.scheme != 'spy':
|
||||
raise serial.SerialException(
|
||||
'expected a string in the form '
|
||||
'"spy://port[?option[=value][&option[=value]]]": '
|
||||
'not starting with spy:// ({!r})'.format(parts.scheme))
|
||||
# process options now, directly altering self
|
||||
formatter = FormatHexdump
|
||||
color = False
|
||||
output = sys.stderr
|
||||
try:
|
||||
for option, values in urlparse.parse_qs(parts.query, True).items():
|
||||
if option == 'file':
|
||||
output = open(values[0], 'w')
|
||||
elif option == 'color':
|
||||
color = True
|
||||
elif option == 'raw':
|
||||
formatter = FormatRaw
|
||||
elif option == 'all':
|
||||
self.show_all = True
|
||||
else:
|
||||
raise ValueError('unknown option: {!r}'.format(option))
|
||||
except ValueError as e:
|
||||
raise serial.SerialException(
|
||||
'expected a string in the form '
|
||||
'"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
|
||||
self.formatter = formatter(output, color)
|
||||
return ''.join([parts.netloc, parts.path])
|
||||
|
||||
def write(self, tx):
|
||||
tx = to_bytes(tx)
|
||||
self.formatter.tx(tx)
|
||||
return super(Serial, self).write(tx)
|
||||
|
||||
def read(self, size=1):
|
||||
rx = super(Serial, self).read(size)
|
||||
if rx or self.show_all:
|
||||
self.formatter.rx(rx)
|
||||
return rx
|
||||
|
||||
if hasattr(serial.Serial, 'cancel_read'):
|
||||
def cancel_read(self):
|
||||
self.formatter.control('Q-RX', 'cancel_read')
|
||||
super(Serial, self).cancel_read()
|
||||
|
||||
if hasattr(serial.Serial, 'cancel_write'):
|
||||
def cancel_write(self):
|
||||
self.formatter.control('Q-TX', 'cancel_write')
|
||||
super(Serial, self).cancel_write()
|
||||
|
||||
@property
|
||||
def in_waiting(self):
|
||||
n = super(Serial, self).in_waiting
|
||||
if self.show_all:
|
||||
self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
|
||||
return n
|
||||
|
||||
def flush(self):
|
||||
self.formatter.control('Q-TX', 'flush')
|
||||
super(Serial, self).flush()
|
||||
|
||||
def reset_input_buffer(self):
|
||||
self.formatter.control('Q-RX', 'reset_input_buffer')
|
||||
super(Serial, self).reset_input_buffer()
|
||||
|
||||
def reset_output_buffer(self):
|
||||
self.formatter.control('Q-TX', 'reset_output_buffer')
|
||||
super(Serial, self).reset_output_buffer()
|
||||
|
||||
def send_break(self, duration=0.25):
|
||||
self.formatter.control('BRK', 'send_break {}s'.format(duration))
|
||||
super(Serial, self).send_break(duration)
|
||||
|
||||
@serial.Serial.break_condition.setter
|
||||
def break_condition(self, level):
|
||||
self.formatter.control('BRK', 'active' if level else 'inactive')
|
||||
serial.Serial.break_condition.__set__(self, level)
|
||||
|
||||
@serial.Serial.rts.setter
|
||||
def rts(self, level):
|
||||
self.formatter.control('RTS', 'active' if level else 'inactive')
|
||||
serial.Serial.rts.__set__(self, level)
|
||||
|
||||
@serial.Serial.dtr.setter
|
||||
def dtr(self, level):
|
||||
self.formatter.control('DTR', 'active' if level else 'inactive')
|
||||
serial.Serial.dtr.__set__(self, level)
|
||||
|
||||
@serial.Serial.cts.getter
|
||||
def cts(self):
|
||||
level = super(Serial, self).cts
|
||||
self.formatter.control('CTS', 'active' if level else 'inactive')
|
||||
return level
|
||||
|
||||
@serial.Serial.dsr.getter
|
||||
def dsr(self):
|
||||
level = super(Serial, self).dsr
|
||||
self.formatter.control('DSR', 'active' if level else 'inactive')
|
||||
return level
|
||||
|
||||
@serial.Serial.ri.getter
|
||||
def ri(self):
|
||||
level = super(Serial, self).ri
|
||||
self.formatter.control('RI', 'active' if level else 'inactive')
|
||||
return level
|
||||
|
||||
@serial.Serial.cd.getter
|
||||
def cd(self):
|
||||
level = super(Serial, self).cd
|
||||
self.formatter.control('CD', 'active' if level else 'inactive')
|
||||
return level
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
if __name__ == '__main__':
|
||||
ser = Serial(None)
|
||||
ser.port = 'spy:///dev/ttyS0'
|
||||
print(ser)
|
||||
Reference in New Issue
Block a user