Code your own modules to create modular resources
Viam provides built-in support for a variety of different components and services, but you can also add support for unsupported resources by creating a module. A module provides one or more modular resources, which add support for resource types or models that are not built into Viam.
You can browse existing modules in the Viam Registry to find one that supports your custom hardware or software, or you can write your own module.
Modules run alongside viam-server
as separate processes, communicating with viam-server
over UNIX sockets.
When a module initializes, it registers its model or models and the associated APIs with viam-server
, making the new model available for use.
In most cases, the modular resources you create should be a new model that implements an existing Viam API.
For example, if you have a custom base, you can create a new base model that implements the rdk:component:base
API.
Find your reference files
Before you start coding a new resource model, follow these steps to find the appropriate source code as a reference for the methods you need to implement:
Understand your model’s client interface:
To create a new resource model, you need to implement your model’s client interface in a file called
my_modular_resource.py
.This interface defines how your model’s server responds to API requests.
To ensure the client interface you create returns the expected results, use the appropriate client interface defined in
components/<resource-name>/client.py orservices/<resource-name>/client.py in the Viam Python SDK as a reference.For example, the
base
component client is defined in theclient.py file.See Valid APIs to implement in your model for more information.
To create a new resource model, you need to implement your model’s client interface in a file called
my_modular_resource.go
.This interface defines how your model’s server responds to API requests.
To ensure the client interface you create returns the expected results, use the appropriate client interface defined in
components/<resource-name>/client.go orservices/<resource-name>/client.go in the Viam RDK as a reference.For example, the
base
component client is defined in theclient.go file.See Valid APIs to implement in your model for more information.
Prepare to Import Your Custom Model and Subtype’s API:
To prepare to import your custom model and chosen resource subtype’s API into your main program and register them with your preferred SDK:
Find the subtype API as defined in the relevant
components/<resource-name>/<resource-name>.py orservices/<resource-name>/ file in the Python SDK..py For example, the
base
component subtype is defined inviam-python-sdk/src/viam/components/base/base.py .Find the subtype API as defined in the relevant
components/<resource-name>/<resource-name>.go orservices/<resource-name>/<resource-name>.go file in the RDK.For example, the
base
component subtype is defined inrdk/components/base/base.go .
Valid APIs to implement in your model
When implementing a custom model of an existing component, valid APIs always have the following parameters:
namespace
:rdk
type
:component
subtype
: any one of these component proto files, for examplemotor
When implementing a custom model of an existing service, valid APIs always have the following parameters:
namespace
:rdk
type
:service
subtype
: any one of these service proto files, for examplenavigation
Unique cases
If you are using unique hardware that does not already have an appropriate API defined to support it, you can use the generic API to add support for that unique hardware type to your machine.
Some use cases may require you to define a new API, or to deploy custom components using a server on a remote part. For more information, see Advanced Modular Resources.
Create a custom module
A custom module wraps one or more models. To create a custom module, follow these steps:
- Code a new resource model server.
- Code a main program.
- Compile or package the module into a single executable.
Code a new resource model
Naming your model
Use the naming schema: namespace:repo-name:name
.
For more information see Naming your model.
Your new resource model server must have all the methods that the Viam RDK requires, and should match the built-in API client subtype like rdk:component:base
.
Create a folder for your module and save your code as a file named
The following example module registers a modular resource implementing Viam’s built-in Base API (rdk:service:base
) as a new model, "mybase"
, using the model family acme:demo:mybase
.
Click to view sample code for my_base.py
from typing import ClassVar, Mapping, Sequence, Any, Dict, Optional, cast
from typing_extensions import Self
from viam.components.base import Base
from viam.components.motor import Motor
from viam.module.types import Reconfigurable
from viam.module.module import Module
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName, Vector3
from viam.resource.base import ResourceBase
from viam.resource.registry import Registry, ResourceCreatorRegistration
from viam.resource.types import Model, ModelFamily
from viam.utils import ValueTypes
from viam.logging import getLogger
LOGGER = getLogger(__name__)
class MyBase(Base, Reconfigurable):
"""
MyBase implements a base that only supports set_power
(basic forward/back/turn controls) is_moving (check if in motion), and stop
(stop all motion).
It inherits from the built-in resource subtype Base and conforms to the
``Reconfigurable`` protocol, which signifies that this component can be
reconfigured. Additionally, it specifies a constructor function
``MyBase.new_base`` which confirms to the
``resource.types.ResourceCreator`` type required for all models.
"""
# Here is where we define our new model's colon-delimited-triplet
# (acme:demo:mybase) acme = namespace, demo = repo-name,
# mybase = model name.
MODEL: ClassVar[Model] = Model(ModelFamily("acme", "demo"), "mybase")
def __init__(self, name: str, left: str, right: str):
super().__init__(name, left, right)
# Constructor
@classmethod
def new_base(cls,
config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
base = cls(config.name)
base.reconfigure(config, dependencies)
return base
# Validates JSON Configuration
@classmethod
def validate_config(cls, config: ComponentConfig) -> Sequence[str]:
left_name = config.attributes.fields["motorL"].string_value
if left_name == "":
raise Exception(
"A motorL attribute is required for a MyBase component.")
right_name = [config.attributes.fields["motorR"].string_value]
if right_name == "":
raise Exception(
"A motorR attribute is required for a MyBase component.")
return [left_name, right_name]
# Handles attribute reconfiguration
def reconfigure(self,
config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase]):
left_name = config.attributes.fields["motorL"].string_value
right_name = config.attributes.fields["motorR"].string_value
left_motor = dependencies[Motor.get_resource_name(left_name)]
right_motor = dependencies[Motor.get_resource_name(right_name)]
self.left = cast(Motor, left_motor)
self.right = cast(Motor, right_motor)
"""
Implement the methods the Viam RDK defines for the base API
(rdk:component:base)
"""
# move_straight: unimplemented
async def move_straight(self,
distance: int,
velocity: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
raise NotImplementedError
# spin: unimplemented
async def spin(self,
angle: float,
velocity: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
raise NotImplementedError
# set_power: set the linear and angular velocity of the left and right
# motors on the base
async def set_power(self,
linear: Vector3,
angular: Vector3,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
# stop the base if absolute value of linear and angular velocity is
# less than .01
if abs(linear.y) < 0.01 and abs(angular.z) < 0.01:
return self.stop(extra=extra, timeout=timeout)
# use linear and angular velocity to calculate percentage of max power
# to pass to SetPower for left & right motors
sum = abs(linear.y) + abs(angular.z)
self.left.set_power(power=((linear.y - angular.z) / sum),
extra=extra,
timeout=timeout)
self.right.set_power(power=((linear.y + angular.z) / sum),
extra=extra,
timeout=timeout)
# set_velocity: unimplemented
async def set_velocity(self,
linear: Vector3,
angular: Vector3,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
raise NotImplementedError
# get_properties: unimplemented
async def get_properties(self,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
raise NotImplementedError
# stop: stop the base from moving by stopping both motors
async def stop(self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs):
self.left.stop(extra=extra, timeout=timeout)
self.right.stop(extra=extra, timeout=timeout)
# is_moving: check if either motor on the base is moving with motors'
# is_powered
async def is_moving(self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs) -> bool:
return self.left.is_powered(extra=extra, timeout=timeout)[0] or \
self.right.is_powered(extra=extra, timeout=timeout)[0]
Additional example modules are available in the Python SDK GitHub repository.
Your new resource model server must have all the methods that the Viam RDK requires, and should match the built-in API client subtype like rdk:component:base
.
Create a folder for your module and save your code as a file named
The following example module registers a modular resource implementing Viam’s built-in Base API (rdk:service:base
) as a new model, "mybase"
, using the model family acme:demo:mybase
.
Click to view sample code for mybase.go
// Package mybase implements a base that only supports SetPower (basic forward/back/turn controls), IsMoving (check if in motion), and Stop (stop all motion).
// It extends the built-in resource subtype Base and implements methods to handle resource construction, attribute configuration, and reconfiguration.
package mybase
import (
"context"
"fmt"
"math"
"github.com/golang/geo/r3"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.viam.com/rdk/components/base"
"go.viam.com/rdk/components/base/kinematicbase"
"go.viam.com/rdk/components/motor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/spatialmath"
)
// Here is where we define your new model's colon-delimited-triplet (acme:demo:mybase)
// acme = namespace, demo = repo-name, mybase = model name.
var (
Model = resource.NewModel("acme", "demo", "mybase")
errUnimplemented = errors.New("unimplemented")
)
const (
myBaseWidthMm = 500.0 // Base has a wheel tread of 500 millimeters
myBaseTurningRadiusM = 0.3 // Base turns around a circle of radius .3 meters
)
func init() {
resource.RegisterComponent(base.API, Model, resource.Registration[base.Base, *Config]{
Constructor: newBase,
})
}
func newBase(ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger) (base.Base, error) {
b := &myBase{
Named: conf.ResourceName().AsNamed(),
logger: logger,
}
if err := b.Reconfigure(ctx, deps, conf); err != nil {
return nil, err
}
return b, nil
}
// Reconfigure reconfigures with new settings.
func (b *myBase) Reconfigure(ctx context.Context, deps resource.Dependencies, conf resource.Config) error {
b.left = nil
b.right = nil
// This takes the generic resource.Config passed down from the parent and converts it to the
// model-specific (aka "native") Config structure defined, above making it easier to directly access attributes.
baseConfig, err := resource.NativeConfig[*Config](conf)
if err != nil {
return err
}
b.left, err = motor.FromDependencies(deps, baseConfig.LeftMotor)
if err != nil {
return errors.Wrapf(err, "unable to get motor %v for mybase", baseConfig.LeftMotor)
}
b.right, err = motor.FromDependencies(deps, baseConfig.RightMotor)
if err != nil {
return errors.Wrapf(err, "unable to get motor %v for mybase", baseConfig.RightMotor)
}
geometries, err := kinematicbase.CollisionGeometry(conf.Frame)
if err != nil {
b.logger.Warnf("base %v %s", b.Name(), err.Error())
}
b.geometries = geometries
// Stop motors when reconfiguring.
return multierr.Combine(b.left.Stop(context.Background(), nil), b.right.Stop(context.Background(), nil))
}
// DoCommand simply echos whatever was sent.
func (b *myBase) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
return cmd, nil
}
// Config contains two component (motor) names.
type Config struct {
LeftMotor string `json:"motorL"`
RightMotor string `json:"motorR"`
}
// Validate validates the config and returns implicit dependencies,
// this Validate checks if the left and right motors exist for the module's base model.
func (cfg *Config) Validate(path string) ([]string, error) {
// check if the attribute fields for the right and left motors are non-empty
// this makes them reuqired for the model to successfully build
if cfg.LeftMotor == "" {
return nil, fmt.Errorf(`expected "motorL" attribute for mybase %q`, path)
}
if cfg.RightMotor == "" {
return nil, fmt.Errorf(`expected "motorR" attribute for mybase %q`, path)
}
// Return the left and right motor names so that `newBase` can access them as dependencies.
return []string{cfg.LeftMotor, cfg.RightMotor}, nil
}
type myBase struct {
resource.Named
left motor.Motor
right motor.Motor
logger logging.Logger
geometries []spatialmath.Geometry
}
// MoveStraight does nothing.
func (b *myBase) MoveStraight(ctx context.Context, distanceMm int, mmPerSec float64, extra map[string]interface{}) error {
return errUnimplemented
}
// Spin does nothing.
func (b *myBase) Spin(ctx context.Context, angleDeg, degsPerSec float64, extra map[string]interface{}) error {
return errUnimplemented
}
// SetVelocity does nothing.
func (b *myBase) SetVelocity(ctx context.Context, linear, angular r3.Vector, extra map[string]interface{}) error {
return errUnimplemented
}
// SetPower computes relative power between the wheels and sets power for both motors.
func (b *myBase) SetPower(ctx context.Context, linear, angular r3.Vector, extra map[string]interface{}) error {
b.logger.Debugf("SetPower Linear: %.2f Angular: %.2f", linear.Y, angular.Z)
if math.Abs(linear.Y) < 0.01 && math.Abs(angular.Z) < 0.01 {
return b.Stop(ctx, extra)
}
sum := math.Abs(linear.Y) + math.Abs(angular.Z)
err1 := b.left.SetPower(ctx, (linear.Y-angular.Z)/sum, extra)
err2 := b.right.SetPower(ctx, (linear.Y+angular.Z)/sum, extra)
return multierr.Combine(err1, err2)
}
// Stop halts motion.
func (b *myBase) Stop(ctx context.Context, extra map[string]interface{}) error {
b.logger.Debug("Stop")
err1 := b.left.Stop(ctx, extra)
err2 := b.right.Stop(ctx, extra)
return multierr.Combine(err1, err2)
}
// IsMoving returns true if either motor is active.
func (b *myBase) IsMoving(ctx context.Context) (bool, error) {
for _, m := range []motor.Motor{b.left, b.right} {
isMoving, _, err := m.IsPowered(ctx, nil)
if err != nil {
return false, err
}
if isMoving {
return true, err
}
}
return false, nil
}
// Properties returns details about the physics of the base.
func (b *myBase) Properties(ctx context.Context, extra map[string]interface{}) (base.Properties, error) {
return base.Properties{
TurningRadiusMeters: myBaseTurningRadiusM,
WidthMeters: myBaseWidthMm * 0.001, // converting millimeters to meters
}, nil
}
// Geometries returns physical dimensions.
func (b *myBase) Geometries(ctx context.Context, extra map[string]interface{}) ([]spatialmath.Geometry, error) {
return b.geometries, nil
}
// Close stops motion during shutdown.
func (b *myBase) Close(ctx context.Context) error {
return b.Stop(ctx, nil)
}
The code for the custom model (
Code a main entry point program
The main program starts the module.
Import your custom model and API into the main program and register them with the Python SDK.
When executed, the main program registers the mybase
custom model and API helper functions with the Python SDK and creates and starts the new module.
Click to view sample code for main.py
import asyncio
from viam.components.base import Base
from viam.module.module import Module
from viam.resource.registry import Registry, ResourceCreatorRegistration
from my_base import MyBase
async def main():
"""
This function creates and starts a new module, after adding all desired
resource models. Resource creators must be registered to the resource
registry before the module adds the resource model.
"""
Registry.register_resource_creator(
Base.SUBTYPE,
MyBase.MODEL,
ResourceCreatorRegistration(MyBase.new_base, MyBase.validate_config))
module = Module.from_args()
module.add_model_from_registry(Base.SUBTYPE, MyBase.MODEL)
await module.start()
if __name__ == "__main__":
asyncio.run(main())
Important
You must define all functions belonging to a built-in resource subtype’s API if defining a new model. Otherwise, the class will not instantiate.
If you do not wish to implement all methods, raise anNotImplementedError()
in the body of functions you do not want to implement or put pass
.
Additionally, return any values designated in the function’s return signature, typed correctly.
The main program starts the module.
Import your custom model and API into the main program and register them with the RDK.
When executed, the main program registers the mybase
custom model and API helper functions with the Python SDK and creates and starts the new module.
Click to view sample code for main.go
// Package main is a module which serves the mybase custom model.
package main
import (
"context"
"go.viam.com/rdk/components/base"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/module"
"go.viam.com/utils"
// NOTE: You must update the following line to import your local package "mybase"
"go.viam.com/rdk/examples/customresources/models/mybase"
)
func main() {
// NewLoggerFromArgs will create a logging.Logger at "DebugLevel" if
// "--log-level=debug" is an argument in os.Args and at "InfoLevel" otherwise.
utils.ContextualMain(mainWithArgs, module.NewLoggerFromArgs("yourmodule"))
}
func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) (err error) {
myMod, err := module.NewModuleFromArgs(ctx, logger)
if err != nil {
return err
}
// Models and APIs add helpers to the registry during their init().
// They can then be added to the module here.
err = myMod.AddModelFromRegistry(ctx, base.API, mybase.Model)
if err != nil {
return err
}
err = myMod.Start(ctx)
defer myMod.Close(ctx)
if err != nil {
return err
}
<-ctx.Done()
return nil
}
Important
You must define all functions belonging to a built-in resource subtype’s API if defining a new model. Otherwise, the class will not instantiate.
If you do not wish to implement all methods, raise errUnimplemented
for the functions you do not want to implement.
Additionally, return any values designated in the function’s return signature, typed correctly.
Prepare the module for execution
To add a module to the configuration of your robot, you need to have an executable that:
- runs your module when executed,
- takes a local socket as a command line argument, and
- exits cleanly when sent a termination signal.
Your options for completing this step are flexible, as this file does not need to be in raw binary format.
Create and save a new shell script (
Make sure to prepare a Python virtual environment in the directory your module is in to ensure your module has access to any required libraries:
Create a
requirements.txt
file containing a list of all the dependencies your module relies on. For example, arequirements.txt
file with the following contents ensures that the Viam Python SDK (viam-sdk
) is installed:viam-sdk
Add additional dependencies as needed. See the pip
requirements.txt
file documentation for more information.Add a shell script that creates a new virtual environment, installs the dependencies listed in
requirements.txt
, and runs the module entry point filemain.py
:#!/bin/sh cd `dirname $0` # Create a virtual environment to run our code VENV_NAME="venv" PYTHON="$VENV_NAME/bin/python" python3 -m venv $VENV_NAME $PYTHON -m pip install -r requirements.txt -U # remove -U if viam-sdk should not be upgraded whenever possible # Be sure to use `exec` so that termination signals reach the python process, # or handle forwarding termination signals manually exec $PYTHON <your-src-dir-if-inside>/main.py $@
To make your shell script executable, run the following command in your terminal:
sudo chmod +x <your-file-path-to>/<run.sh>
Install a supported C compiler on your machine. Make sure to prepare a Python virtual environment in the directory your module is in to ensure your module has access to any required libraries. Compile your module as follows:
Create a
requirements.txt
file containing a list of all the dependencies your module relies on. For example, arequirements.txt
file with the following contents ensures that the Viam Python SDK (viam-sdk
) and Nuitka (nuitka
) are installed:viam-sdk nuitka
Add additional dependencies as needed.
After installing dependencies in your virtual environment, compile your module with the following command:
python -m nuitka --onefile src/main.py
Any data files you want to include you must specify through a CLI option as follows:
python -m nuitka --onefile --include-data-files=src/arm/my_arm_kinematics.json=src/arm/my_arm_kinematics.json src/main.py
No relative imports (imports starting with
.
) will work with this option. In addition, no cross compiling is allowed. You have to compile on your target architecture.
Make sure to prepare a Python virtual environment in the directory your module is so your module has access to any required libraries. Compile your module as follows:
Create a
requirements.txt
file containing a list of all the dependencies your module relies on. For example, arequirements.txt
file with the following contents ensures that the Viam Python SDK (viam-sdk
), PyInstaller (pyinstaller
), and the Google API Python client (google-api-python-client
) are installed:viam-sdk pyinstaller google-api-python-client
Add additional dependencies as needed.
Add the Google API python client as a hidden import when compiling your module. After installing the required dependencies in your virtual environment, compile your module as follows:
python -m PyInstaller --onefile --hidden-import="googleapiclient" src/main.py
Any data files you want to include you must specify through a CLI option as follows:
python -m PyInstaller --onefile --hidden-import="googleapiclient" --add-data src/arm/my_arm_kinematics.json:src/arm/ src/main.py
No relative imports (imports starting with
.
) will work with this option. In addition, no cross compiling is allowed. You have to compile on your target architecture.
Use Go to compile and obtain a single executable for your module:
- Navigate to your module directory in your terminal.
- Run
go build
to compile your entry point (main program) filemain.go and all other.go files in the directory, building your module and all dependencies into a single executable file. - Run
ls
in your module directory to find the executable, which should have the same name as the module directory.
Configure logging
To enable your module to write logs to the Viam app, ensure that you have added the following lines of code to your respective module code. Log messages are sent to the Viam app and appear under the Logs tab for your machine.
To enable your Python module to write log messages to the Viam app, add the following lines to your code:
# In your import block, import viam.logging getLogger:
from viam.logging import getLogger
# Before your first class or function, define the LOGGER variable:
LOGGER = getLogger(__name__)
To enable your Go module to write log messages to the Viam app, add the following lines to your code:
// In your import() block, import the logger package:
import(
...
"go.viam.com/rdk/logging"
)
// Alter your component to hold a logger
type component struct {
...
logger logging.Logger
}
// Then, alter your component's constructor to save the logger:
func init() {
registration := resource.Registration[resource.Resource, *Config]{
Constructor: func(ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger) (resource.Resource, error) {
...
return &component {
...
logger: logger
}, nil
},
}
resource.RegisterComponent(...)
}
// Finally, when you need to log, use the functions on your component's logger:
fn (c *component) someFunction(a int) {
// Log with severity info:
c.logger.Infof("performing some function with a=%v",a)
// Log with severity debug (using value wrapping):
c.logger.Debugw("performing some function","a",a)
// Log with severity error without a parameter:
c.logger.Errorln("performing some function")
}
The examples from Code a new resource model include this logging setup.
Next steps
Once you have created your module, you can use the Viam CLI to upload your module to the Viam registry to share it with other Viam users or just to other users in your organization. For added convenience, you can configure automated uploads for new module versions through a continuous integration (CI) workflow, using a GitHub Action.
You can also add your module to your robot as a local module, without uploading it to the Viam registry.
You can also check out these tutorials that create modules:
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!