mcumgr_toolkit

Example

from mcumgr_toolkit import MCUmgrClient

with MCUmgrClient.serial("/dev/ttyACM0") as client:
    client.use_auto_frame_size()

    print(client.os_echo("Hello world!"))
    # Hello world!
1from .mcumgr_toolkit import *
2
3__doc__ = mcumgr_toolkit.__doc__
4if hasattr(mcumgr_toolkit, "__all__"):
5    __all__ = mcumgr_toolkit.__all__
class MCUmgrClient:

A high-level client for Zephyr's MCUmgr SMP functionality

def serial( serial: str, baud_rate: int = 115200, timeout_ms: int = 500) -> MCUmgrClient:

Creates a new serial port based Zephyr MCUmgr SMP client.

Arguments

  • serial - The identifier of the serial device. (Windows: COMxx, Linux: /dev/ttyXX)
  • baud_rate - The baud rate of the serial port.
  • timeout_ms - The communication timeout, in ms.
def usb_serial( identifier: str, baud_rate: int = 115200, timeout_ms: int = 500) -> MCUmgrClient:

Creates a Zephyr MCUmgr SMP client based on a USB serial port identified by VID:PID.

Useful for programming many devices in rapid succession, as Windows usually gives each one a different COMxx identifier.

Arguments

  • identifier - A regex that identifies the device.
  • baud_rate - The baud rate the port should operate at.
  • timeout_ms - The communication timeout, in ms.

Identifier examples

  • 1234:89AB - Vendor ID 1234, Product ID 89AB. Will fail if product has multiple serial ports.
  • 1234:89AB:12 - Vendor ID 1234, Product ID 89AB, Interface 12.
  • 1234:.*:[2-3] - Vendor ID 1234, any Product Id, Interface 2 or 3.
def set_frame_size(self, smp_frame_size: int) -> None:

Configures the maximum SMP frame size that we can send to the device.

Must not exceed MCUMGR_TRANSPORT_NETBUF_SIZE, otherwise we might crash the device.

def use_auto_frame_size(self) -> None:

Configures the maximum SMP frame size that we can send to the device automatically by reading the value of MCUMGR_TRANSPORT_NETBUF_SIZE from the device.

def set_timeout_ms(self, timeout_ms: int) -> None:

Changes the communication timeout.

When the device does not respond to packets within the set duration, an error will be raised.

def set_retries(self, retries: int) -> None:

Changes the retry amount.

When the device encounters a transport error, it will retry this many times until giving up.

def check_connection(self) -> None:

Checks if the device is alive and responding.

Runs a simple echo with random data and checks if the response matches.

Raises an error if the device is not alive and responding.

def firmware_update( self, firmware: bytes, checksum: Union[str, bytes, NoneType] = None, bootloader_type: Optional[Literal['MCUboot']] = None, skip_reboot: bool = False, force_confirm: bool = False, upgrade_only: bool = False, progress: Optional[Callable[[str, Optional[tuple[int, int]]], None]] = None) -> None:

High-level firmware update routine.

Arguments

  • firmware - The firmware image data.
  • checksum - SHA256 of the firmware image. Optional.
  • bootloader_type - The type of bootloader. Auto-detect bootloader if missing.
  • skip_reboot - Do not reboot device after the update.
  • force_confirm - Skip test boot and confirm directly.
  • upgrade_only - Prevent firmware downgrades.
  • progress - A callback that receives progress updates.
def os_echo(self, msg: str) -> str:

Sends a message to the device and expects the same message back as response.

This can be used as a sanity check for whether the device is connected and responsive.

def os_task_statistics(self) -> dict[str, TaskStatistics]:

Queries live task statistics

Note

Converts stkuse and stksiz to bytes. Zephyr originally reports them as number of 4 byte words.

Return

A map of task names with their respective statistics

def os_set_datetime(self, datetime: datetime.datetime) -> None:

Sets the RTC of the device to the given datetime.

Uses the contained local time and discards timezone information.

def os_get_datetime(self) -> datetime.datetime:

Retrieves the device RTC's datetime.

Will not contain timezone information.

def os_system_reset(self, force: bool = False, boot_mode: Optional[int] = None) -> None:

Issues a system reset.

Arguments

  • force - Issues a force reset.
  • boot_mode - Overwrites the default boot mode.

Known boot_mode values:

  • 0 - Normal system boot
  • 1 - Bootloader recovery mode

Note that boot_mode only works if MCUMGR_GRP_OS_RESET_BOOT_MODE is enabled.

def os_mcumgr_parameters(self) -> MCUmgrParameters:

Fetch parameters from the MCUmgr library

def os_application_info(self, format: Optional[str] = None) -> str:

Fetch information on the running image

Similar to Linux's uname command.

Arguments

  • format - Format specifier for the returned response

For more information about the format specifier fields, see the SMP documentation.

def os_bootloader_info(self) -> Any:

Fetch information on the device's bootloader

def image_get_state(self) -> list[ImageState]:

Obtain a list of images with their current state.

def image_set_state( self, hash: Union[str, bytes, NoneType] = None, confirm: bool = False) -> list[ImageState]:

Modify the current image state and return the new state

Arguments

If confirm is false, perform a test boot with the given image and revert upon hard reset.

If confirm is true, boot to the given image and mark it as confirmed. If hash is omitted, confirm the currently running image.

Note that hash will not be the same as the SHA256 of the whole firmware image, it is the field in the MCUboot TLV section that contains a hash of the data which is used for signature verification purposes.

def image_upload( self, data: bytes, image: Optional[int] = None, checksum: Union[str, bytes, NoneType] = None, upgrade_only: bool = False, progress: Optional[Callable[[int, int], None]] = None) -> None:

Upload a firmware image to an image slot.

Note

This only uploads the image to a slot on the device, it has to be activated through image_set_state for an actual update to happen.

For a full firmware update algorithm in a single step, see firmware_update.

Arguments

  • data - The firmware image data
  • image - Selects target image on the device. Defaults to 0.
  • checksum - The SHA256 checksum of the image. If missing, will be computed from the image data.
  • upgrade_only - If true, allow firmware upgrades only and reject downgrades.
  • progress - A callable object that takes (transmitted, total) values as parameters. Any return value is ignored. Raising an exception aborts the operation.

Performance

Uploading files with Zephyr's default parameters is slow. You want to increase MCUMGR_TRANSPORT_NETBUF_SIZE to maybe 4096 and then enable larger chunking through either set_frame_size or use_auto_frame_size.

def image_erase(self, slot: Optional[int] = None) -> None:

Erase image slot on target device.

Arguments

  • slot - The slot ID of the image to erase. Slot 1 if omitted.
def image_slot_info(self) -> list[SlotInfoImage]:

Obtain a list of available image slots.

def stats_get_group_data(self, name: str) -> dict[str, int]:

Query the current values of a given stats group

Arguments

def stats_list_groups(self) -> list[str]:

Query the list of available stats groups

def fs_file_download( self, name: str, progress: Optional[Callable[[int, int], None]] = None) -> bytes:

Load a file from the device.

Arguments

  • name - The full path of the file on the device.
  • progress - A callable object that takes (transmitted, total) values as parameters. Any return value is ignored. Raising an exception aborts the operation.

Return

The file content

Performance

Downloading files with Zephyr's default parameters is slow. You want to increase MCUMGR_TRANSPORT_NETBUF_SIZE to maybe 4096 or larger.

def fs_file_upload( self, name: str, data: bytes, progress: Optional[Callable[[int, int], None]] = None) -> None:

Write a file to the device.

Arguments

  • name - The full path of the file on the device.
  • data - The file content.
  • progress - A callable object that takes (transmitted, total) values as parameters. Any return value is ignored. Raising an exception aborts the operation.

Performance

Uploading files with Zephyr's default parameters is slow. You want to increase MCUMGR_TRANSPORT_NETBUF_SIZE to maybe 4096 and then enable larger chunking through either set_frame_size or use_auto_frame_size.

def fs_file_status(self, name: str) -> FileStatus:

Queries the file status

def fs_file_checksum( self, name: str, algorithm: Optional[str] = None, offset: int = 0, length: Optional[int] = None) -> FileChecksum:

Computes the hash/checksum of a file

For available algorithms, see fs_supported_checksum_types.

Arguments

  • name - The absolute path of the file on the device
  • algorithm - The hash/checksum algorithm to use, or default if None
  • offset - How many bytes of the file to skip
  • length - How many bytes to read after offset. None for the entire file.
def fs_supported_checksum_types(self) -> dict[str, FileChecksumProperties]:

Queries which hash/checksum algorithms are available on the target

def fs_file_close(self) -> None:

Close all device files MCUmgr has currently open

def shell_execute(self, argv: Sequence[str], use_retries: bool = True) -> str:

Run a shell command.

Arguments

  • argv - The shell command to be executed.
  • use_retries - Retry request a certain amount of times if a transport error occurs. Be aware that this might cause the command to be executed multiple times.

Return

The command output

def enum_get_group_count(self) -> int:

Query how many MCUmgr groups are supported by the device.

Return

The number of MCUmgr groups the device supports.

def enum_get_group_ids(self) -> list[int]:

Query all available group IDs in a single command.

Note that this might fail if the amount of groups is too large for the SMP frame. But given that the Zephyr implementation contains less than 10 groups, this is currently highly unlikely.

If it does fail, use enum_iter_group_ids to iterate through the available group IDs one by one.

Return

A list of all MCUmgr group IDs the device supports.

def enum_get_group_id(self, index: int) -> int:

Query a single group ID from the device.

Arguments

Return

The group ID of the group with the given index

def enum_iter_group_ids(self) -> Iterator[int]:

Iterate through all supported MCUmgr Groups.

Same as enum_get_group_ids, but does not require large message sizes if the number of groups is large. The tradeoff is that this function is much slower.

def enum_get_group_details( self, groups: Optional[Sequence[int]] = None) -> list[GroupDetails]:

Query details from all available groups.

Arguments

  • groups - The group IDs to fetch details for. If omitted, fetch all groups.

Return

A list of details about all MCUmgr group IDs the device supports.

def zephyr_erase_storage(self) -> None:

Erase the storage_partition flash partition.

def raw_command( self, write_operation: bool, group_id: int, command_id: int, data: Any) -> Any:

Execute a raw MCUmgrCommand.

Only returns if no error happened, so the user does not need to check for an rc or err field in the response.

Read Zephyr's SMP Protocol Specification for more information.

Arguments

  • write_operation - Whether the command is a read or write operation.
  • group_id - The group ID of the command
  • command_id - The command ID
  • data - Anything that can be serialized as a proper packet payload.

Example

client.raw_command(True, 0, 0, {"d": "Hello!"})
# Returns: {'r': 'Hello!'}
class FileChecksum:
output: bytes

output hash/checksum

offset: int

offset that hash/checksum calculation started at

type: str

type of hash/checksum that was performed

length: int

length of input data used for hash/checksum generation (in bytes)

class FileChecksumDataFormat:

Data format of the hash/checksum type

Data is a number

Data is a bytes array

class FileChecksumProperties:

Properties of a hash/checksum algorithm

format that the hash/checksum returns

size: int

size (in bytes) of output hash/checksum response

class FileStatus:

Return value of MCUmgrClient.fs_file_status.

length: int

length of file (in bytes)

class GroupDetails:

Details about an MCUmgr group

group: int

the group ID of the MCUmgr command group

name: Optional[str]

the name of the MCUmgr command group

handlers: Optional[int]

the number of handlers that the MCUmgr command group supports

class ImageState:

The state of an image slot

active: bool

true if image is currently active application

image: int

image number

slot: int

slot number within “image”

pending: bool

true if image is set for next swap

version: str

string representing image version, as set with imgtool

hash: Optional[bytes]

Hash of the image header and body

Note that this will not be the same as the SHA256 of the whole file, it is the field in the MCUboot TLV section that contains a hash of the data which is used for signature verification purposes.

bootable: bool

true if image has bootable flag set

permanent: bool

true if image is to stay in primary slot after the next boot

confirmed: bool

true if image has been confirmed

class MCUmgrParameters:
buf_size: int

Single SMP buffer size, this includes SMP header and CBOR payload

buf_count: int

Number of SMP buffers supported

class SlotInfoImage:

Information about a firmware image type returned by MCUmgrClient.image_slot_info

slots: list[SlotInfoImageSlot]

slots available for the image

image: int

number of the image

max_image_size: Optional[int]

maximum size of an application that can be uploaded to that image number

class SlotInfoImageSlot:

Information about a slot that can hold a firmware image

slot: int

slot inside the image being enumerated

size: int

size of the slot

upload_image_id: Optional[int]

specifies the image ID that can be used by external tools to upload an image to that slot

class TaskStatistics:

Statistics of an MCU task/thread

runtime: Optional[int]

task’s/thread’s runtime in “ticks”

tid: int

numeric task ID

prio: int

task priority

stkuse: Optional[int]

task’s/thread’s stack usage

stksiz: Optional[int]

task’s/thread’s stack size

cswcnt: Optional[int]

task’s/thread’s context switches

state: int

numeric task state

class McubootImageInfo:

Information about an MCUboot firmware image

hash: bytes

The identifying hash for the firmware

Note that this will not be the same as the SHA256 of the whole file, it is the field in the MCUboot TLV section that contains a hash of the data which is used for signature verification purposes.

version: str

Firmware version

def mcuboot_get_image_info(image_data: bytes) -> McubootImageInfo:

Extract information from an MCUboot image file