# Copyright (c) 2021 Jeff Irion and contributors
#
# This file is part of the adb-shell package. It incorporates work
# covered by the following license notice:
#
#
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A class for creating a USB connection with the device and sending and receiving data.
.. warning::
USB support is an experimental feature.
* :func:`get_interface`
* :func:`interface_matcher`
* :class:`UsbTransport`
* :meth:`UsbTransport._find`
* :meth:`UsbTransport._find_and_open`
* :meth:`UsbTransport._find_devices`
* :meth:`UsbTransport._find_first`
* :meth:`UsbTransport._flush_buffers`
* :meth:`UsbTransport._open`
* :meth:`UsbTransport._port_path_matcher`
* :meth:`UsbTransport._serial_matcher`
* :meth:`UsbTransport._timeout`
* :meth:`UsbTransport.bulk_read`
* :meth:`UsbTransport.bulk_write`
* :meth:`UsbTransport.close`
* :meth:`UsbTransport.connect`
* :attr:`UsbTransport.port_path`
* :attr:`UsbTransport.serial_number`
* :attr:`UsbTransport.usb_info`
"""
import logging
import platform
import re
import threading
import warnings
import weakref
import usb1
from .base_transport import BaseTransport
from .. import exceptions
#: Default timeout
DEFAULT_TIMEOUT_S = 10
SYSFS_PORT_SPLIT_RE = re.compile("[,/:.-]")
_LOGGER = logging.getLogger(__name__)
CLASS = usb1.CLASS_VENDOR_SPEC # pylint: disable=no-member
SUBCLASS = 0x42
PROTOCOL = 0x01
[docs]
def get_interface(setting): # pragma: no cover
"""Get the class, subclass, and protocol for the given USB setting.
Parameters
----------
setting : TODO
TODO
Returns
-------
TODO
TODO
TODO
TODO
TODO
TODO
"""
return (setting.getClass(), setting.getSubClass(), setting.getProtocol())
[docs]
def interface_matcher(clazz, subclass, protocol): # pragma: no cover
"""Returns a matcher that returns the setting with the given interface.
Parameters
----------
clazz : TODO
TODO
subclass : TODO
TODO
protocol : TODO
TODO
Returns
-------
matcher : function
TODO
"""
interface = (clazz, subclass, protocol)
def matcher(device):
"""TODO
Parameters
----------
device : TODO
TODO
Returns
-------
TODO, None
TODO
"""
for setting in device.iterSettings():
if get_interface(setting) == interface:
return setting
return None
return matcher
[docs]
class UsbTransport(BaseTransport): # pragma: no cover
"""USB communication object. Not thread-safe.
Handles reading and writing over USB with the proper endpoints, exceptions,
and interface claiming.
Parameters
----------
device : usb1.USBDevice
libusb_device to connect to.
setting : usb1.USBInterfaceSetting
libusb setting with the correct endpoints to communicate with.
usb_info : TODO, None
String describing the usb path/serial/device, for debugging.
default_transport_timeout_s : TODO, None
Timeout in seconds for all I/O.
Attributes
----------
_default_transport_timeout_s : TODO, None
Timeout in seconds for all I/O.
_device : TODO
libusb_device to connect to.
_transport : TODO
TODO
_interface_number : TODO
TODO
_max_read_packet_len : TODO
TODO
_read_endpoint : TODO
TODO
_setting : TODO
libusb setting with the correct endpoints to communicate with.
_usb_info : TODO
String describing the usb path/serial/device, for debugging.
_write_endpoint : TODO, None
TODO
"""
# We maintain an idempotent `usb1` context object to ensure that device
# objects we hand back to callers can be used while this class exists
USB1_CTX = usb1.USBContext()
USB1_CTX.open()
_HANDLE_CACHE = weakref.WeakValueDictionary()
_HANDLE_CACHE_LOCK = threading.Lock()
def __init__(self, device, setting, usb_info=None, default_transport_timeout_s=None):
self._setting = setting
self._device = device
self._transport = None
self._interface_number = None
self._read_endpoint = None
self._write_endpoint = None
self._usb_info = usb_info or ''
self._default_transport_timeout_s = default_transport_timeout_s if default_transport_timeout_s is not None else DEFAULT_TIMEOUT_S
self._max_read_packet_len = 0
[docs]
def close(self):
"""Close the USB connection.
"""
if self._transport is None:
return
try:
self._transport.releaseInterface(self._interface_number)
self._transport.close()
except usb1.USBError:
_LOGGER.info('USBError while closing transport %s: ', self.usb_info, exc_info=True)
finally:
self._transport = None
[docs]
def connect(self, transport_timeout_s=None):
"""Create a USB connection to the device.
Parameters
----------
transport_timeout_s : float, None
Set the timeout on the USB instance
"""
read_endpoint = None
write_endpoint = None
for endpoint in self._setting.iterEndpoints():
address = endpoint.getAddress()
if address & usb1.ENDPOINT_DIR_MASK: # pylint: disable=no-member
read_endpoint = address
# max_read_packet_len = endpoint.getMaxPacketSize()
else:
write_endpoint = address
assert read_endpoint is not None
assert write_endpoint is not None
transport = self._device.open()
iface_number = self._setting.getNumber()
try:
if (platform.system() != 'Windows' and transport.kernelDriverActive(iface_number)):
transport.detachKernelDriver(iface_number)
except usb1.USBErrorNotFound: # pylint: disable=no-member
warnings.warn('Kernel driver not found for interface: %s.', iface_number)
# # When this object is deleted, make sure it's closed.
# weakref.ref(self, self.close)
self._transport = transport
self._read_endpoint = read_endpoint
self._write_endpoint = write_endpoint
self._interface_number = iface_number
self._transport.claimInterface(self._interface_number)
[docs]
def bulk_read(self, numbytes, transport_timeout_s=None):
"""Receive data from the USB device.
Parameters
----------
numbytes : int
The maximum amount of data to be received
transport_timeout_s : float, None
When the timeout argument is omitted, ``select.select`` blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks.
Returns
-------
bytes
The received data
Raises
------
adb_shell.exceptions.UsbReadFailedError
Could not receive data
"""
if self._transport is None:
raise exceptions.UsbReadFailedError('This transport has been closed, probably due to another being opened.', None)
try:
# python-libusb1 > 1.6 exposes bytearray()s now instead of bytes/str.
# To support older and newer versions, we ensure everything's bytearray()
# from here on out.
return bytes(self._transport.bulkRead(self._read_endpoint, numbytes, timeout=self._timeout_ms(transport_timeout_s)))
except usb1.USBError as e:
raise exceptions.UsbReadFailedError('Could not receive data from %s (timeout %sms)' % (self.usb_info, self._timeout_ms(transport_timeout_s)), e)
[docs]
def bulk_write(self, data, transport_timeout_s=None):
"""Send data to the USB device.
Parameters
----------
data : bytes
The data to be sent
transport_timeout_s : float, None
When the timeout argument is omitted, ``select.select`` blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks.
Returns
-------
int
The number of bytes sent
Raises
------
adb_shell.exceptions.UsbWriteFailedError
This transport has been closed, probably due to another being opened
adb_shell.exceptions.UsbWriteFailedError
Could not send data
"""
if self._transport is None:
raise exceptions.UsbWriteFailedError('This transport has been closed, probably due to another being opened.', None)
try:
return self._transport.bulkWrite(self._write_endpoint, data, timeout=self._timeout_ms(transport_timeout_s))
except usb1.USBError as e:
raise exceptions.UsbWriteFailedError('Could not send data to %s (timeout %sms)' % (self.usb_info, self._timeout_ms(transport_timeout_s)), e)
[docs]
def _open(self):
"""Opens the USB device for this setting, and claims the interface.
"""
# Make sure we close any previous transport open to this usb device.
port_path = tuple(self.port_path)
with self._HANDLE_CACHE_LOCK:
old_transport = self._HANDLE_CACHE.get(port_path)
if old_transport is not None:
old_transport.Close()
self._read_endpoint = None
self._write_endpoint = None
for endpoint in self._setting.iterEndpoints():
address = endpoint.getAddress()
if address & usb1.USB_ENDPOINT_DIR_MASK: # pylint: disable=no-member
self._read_endpoint = address
self._max_read_packet_len = endpoint.getMaxPacketSize()
else:
self._write_endpoint = address
assert self._read_endpoint is not None
assert self._write_endpoint is not None
transport = self._device.open()
iface_number = self._setting.getNumber()
try:
if (platform.system() != 'Windows' and transport.kernelDriverActive(iface_number)):
transport.detachKernelDriver(iface_number)
except usb1.USBErrorNotFound: # pylint: disable=no-member
warnings.warn('Kernel driver not found for interface: %s.', iface_number)
transport.claimInterface(iface_number)
self._transport = transport
self._interface_number = iface_number
with self._HANDLE_CACHE_LOCK:
self._HANDLE_CACHE[port_path] = self
# When this object is deleted, make sure it's closed.
weakref.ref(self, self.close)
[docs]
def _timeout_ms(self, transport_timeout_s):
"""TODO
Returns
-------
TODO
TODO
"""
return int(transport_timeout_s * 1000 if transport_timeout_s is not None else self._default_transport_timeout_s * 1000)
[docs]
def _flush_buffers(self):
"""TODO
Raises
------
adb_shell.exceptions.UsbReadFailedError
TODO
"""
while True:
try:
self.bulk_read(self._max_read_packet_len, transport_timeout_s=10)
except exceptions.UsbReadFailedError as e:
if isinstance(e.usb_error, usb1.USBErrorTimeout): # pylint: disable=no-member
break
raise
# ======================================================================= #
# #
# Properties #
# #
# ======================================================================= #
@property
def port_path(self):
"""TODO
Returns
-------
TODO
TODO
"""
return [self._device.getBusNumber()] + self._device.getPortNumberList()
@property
def serial_number(self):
"""TODO
Returns
-------
TODO
TODO
"""
return self._device.getSerialNumber()
@property
def usb_info(self):
"""TODO
Returns
-------
TODO
TODO
"""
try:
sn = self.serial_number
except usb1.USBError:
sn = ''
if sn and sn != self._usb_info:
return '%s %s' % (self._usb_info, sn)
return self._usb_info
# ======================================================================= #
# #
# Matchers #
# #
# ======================================================================= #
[docs]
@classmethod
def _port_path_matcher(cls, port_path):
"""Returns a device matcher for the given port path.
Parameters
----------
port_path : TODO
TODO
Returns
-------
function
TODO
"""
if isinstance(port_path, str):
# Convert from sysfs path to port_path.
port_path = [int(part) for part in SYSFS_PORT_SPLIT_RE.split(port_path)]
return lambda device: device.port_path == port_path
[docs]
@classmethod
def _serial_matcher(cls, serial):
"""Returns a device matcher for the given serial.
Parameters
----------
serial : TODO
TODO
Returns
-------
function
TODO
"""
return lambda device: device.serial_number == serial
# ======================================================================= #
# #
# Finders #
# #
# ======================================================================= #
[docs]
@classmethod
def _find(cls, setting_matcher, port_path=None, serial=None, default_transport_timeout_s=None):
"""Gets the first device that matches according to the keyword args.
Parameters
----------
setting_matcher : TODO
TODO
port_path : TODO, None
TODO
serial : TODO, None
TODO
default_transport_timeout_s : TODO, None
TODO
Returns
-------
TODO
TODO
"""
if port_path:
device_matcher = cls._port_path_matcher(port_path)
usb_info = port_path
elif serial:
device_matcher = cls._serial_matcher(serial)
usb_info = serial
else:
device_matcher = None
usb_info = 'first'
return cls._find_first(setting_matcher, device_matcher, usb_info=usb_info, default_transport_timeout_s=default_transport_timeout_s)
[docs]
@classmethod
def _find_and_open(cls, setting_matcher, port_path=None, serial=None, default_transport_timeout_s=None):
"""TODO
Parameters
----------
setting_matcher : TODO
TODO
port_path : TODO, None
TODO
serial : TODO, None
TODO
default_transport_timeout_s : TODO, None
TODO
Returns
-------
dev : TODO
TODO
"""
dev = cls._find(setting_matcher, port_path=port_path, serial=serial, default_transport_timeout_s=default_transport_timeout_s)
dev._open() # pylint: disable=protected-access
dev._flush_buffers() # pylint: disable=protected-access
return dev
[docs]
@classmethod
def _find_devices(cls, setting_matcher, device_matcher=None, usb_info='', default_transport_timeout_s=None):
"""_find and yield the devices that match.
Parameters
----------
setting_matcher : TODO
Function that returns the setting to use given a ``usb1.USBDevice``, or ``None``
if the device doesn't have a valid setting.
device_matcher : TODO, None
Function that returns ``True`` if the given ``UsbTransport`` is
valid. ``None`` to match any device.
usb_info : str
Info string describing device(s).
default_transport_timeout_s : TODO, None
Default timeout of commands in seconds.
Yields
------
TODO
UsbTransport instances
"""
for device in cls.USB1_CTX.getDeviceIterator(skip_on_error=True):
setting = setting_matcher(device)
if setting is None:
continue
transport = cls(device, setting, usb_info=usb_info, default_transport_timeout_s=default_transport_timeout_s)
if device_matcher is None or device_matcher(transport):
yield transport
[docs]
@classmethod
def _find_first(cls, setting_matcher, device_matcher=None, usb_info='', default_transport_timeout_s=None):
"""Find and return the first matching device.
Parameters
----------
setting_matcher : TODO
Function that returns the setting to use given a ``usb1.USBDevice``, or ``None``
if the device doesn't have a valid setting.
device_matcher : TODO
Function that returns ``True`` if the given ``UsbTransport`` is
valid. ``None`` to match any device.
usb_info : str
Info string describing device(s).
default_transport_timeout_s : TODO, None
Default timeout of commands in seconds.
Returns
-------
TODO
An instance of `UsbTransport`
Raises
------
adb_shell.exceptions.DeviceNotFoundError
Raised if the device is not available.
"""
try:
return next(cls._find_devices(setting_matcher, device_matcher=device_matcher, usb_info=usb_info, default_transport_timeout_s=default_transport_timeout_s))
except StopIteration:
raise exceptions.UsbDeviceNotFoundError('No device available, or it is in the wrong configuration.')
[docs]
@classmethod
def find_adb(cls, serial=None, port_path=None, default_transport_timeout_s=None):
"""TODO
Parameters
----------
serial : TODO
TODO
port_path : TODO
TODO
default_transport_timeout_s : TODO, None
Default timeout of commands in seconds.
Returns
-------
UsbTransport
TODO
"""
return cls._find(
interface_matcher(CLASS, SUBCLASS, PROTOCOL),
serial=serial,
port_path=port_path,
default_transport_timeout_s=default_transport_timeout_s
)
[docs]
@classmethod
def find_all_adb_devices(cls, default_transport_timeout_s=None):
"""Find all ADB devices attached via USB.
Parameters
----------
default_transport_timeout_s : TODO, None
Default timeout of commands in seconds.
Returns
-------
generator
A generator which yields each ADB device attached via USB.
"""
for dev in cls._find_devices(interface_matcher(CLASS, SUBCLASS, PROTOCOL), default_transport_timeout_s=default_transport_timeout_s):
yield dev