Previous
Write a Driver Module
Your machine has resources – sensors, motors, cameras – that work individually.
A logic module makes them work together. It runs as a service alongside
viam-server, declares dependencies on the resources it needs, and implements
whatever control, monitoring, or coordination logic your application requires.
Use a logic module when you need your machine to make decisions based on what it senses: trigger actions when readings cross a threshold, coordinate multiple components to accomplish a task, aggregate data from several sources, or run any continuous process that reads from some resources and acts on others.
A driver module wraps hardware – it implements
a component API like sensor or motor so that viam-server can talk to a
specific piece of hardware.
A logic module (this page) orchestrates existing resources – it reads from sensors, commands motors, and makes decisions. It typically implements a service API.
Both are modules. The difference is what they do, not how they’re built. The lifecycle, config validation, dependency, and deployment patterns are the same.
Every module goes through a defined lifecycle:
viam-server launches the module as a separate process.viam-server calls a
validation method that checks config attributes and declares dependencies
on other resources.viam-server calls the constructor with the validated
config and resolved dependencies.viam-server calls
the validation method again, then the reconfiguration method.viam-server calls the close method. Clean up resources
(including background tasks) here.For more detail, see Module Lifecycle.
A logic module typically depends on other resources on the machine – the
sensors it monitors, the motors it controls. You declare dependencies in your
validation method by returning their names. viam-server ensures those
resources are ready before calling your constructor, and passes them in as a
map you can look up by name.
The pattern has three steps:
The code examples below mark each step with numbered comments.
The generic service API has a single method: DoCommand. It accepts an
arbitrary key-value map and returns one. This makes it a flexible interface for
custom logic – you define your own command vocabulary.
// Request
{"command": "get_alerts", "severity": "critical"}
// Response
{"alerts": [{"sensor": "temp-1", "value": 42.5, "threshold": 40.0}]}
Use generic when your module’s interface doesn’t map to an existing service
API (like vision or mlmodel). For the full list of service APIs, see
Resource APIs.
Logic modules often need to run continuously – polling sensors, checking
thresholds, updating state. You can spawn background tasks in Go or C++. It is not recommended to spawn background tasks in Python because they will conflict with the Viam SDK’s background tasks. If you need to run background logic in a Python module, instead place the logic in DoCommand and create a scheduled job to run the task on a schedule.
The key requirement: your background task must stop cleanly when the module shuts down or reconfigures. Use a cancellation signal (a channel in Go) to coordinate this.
When writing a logic module, follow the steps outlined below. To illustrate each step we’ll use a temperature alert monitor as a worked example. It watches one or more sensors, compares their readings against configurable thresholds, and maintains a list of active alerts that your application code can query.
viam module generate
| Prompt | What to enter | Why |
|---|---|---|
| Module name | alert-monitor | A short, descriptive name |
| Language | python or go | Your implementation language |
| Visibility | private | Keep it private while developing |
| Namespace | Your organization namespace | Scopes the module to your org |
| Resource subtype | generic (under services) | Flexible service API |
| Model name | temp-alert | The model name for your service |
| Register | yes | Registers the module with Viam |
The generator creates a complete project. The key files you will edit:
| File | Purpose |
|---|---|
src/models/temp_alert.py | Service class skeleton – you will edit this |
src/main.py | Entry point – starts the module server (no changes needed) |
meta.json | Module metadata for the registry |
| File | Purpose |
|---|---|
alert_monitor.go | Service implementation skeleton – you will edit this |
cmd/module/main.go | Entry point – starts the module server (no changes needed) |
meta.json | Module metadata for the registry |
Open the generated resource file. Define config attributes for the sensors to monitor and the alert thresholds.
In src/models/temp_alert.py, add config attributes to your class:
class TempAlert(Generic, EasyResource):
MODEL: ClassVar[Model] = Model(
ModelFamily("my-org", "alert-monitor"), "temp-alert"
)
sensor_names: list[str]
max_temp: float
poll_interval: float
alerts: list[dict]
_monitor_task: Optional[asyncio.Task]
_stop_event: asyncio.Event
In the generated .go file, add fields to the Config struct. Each field
needs a json tag matching the attribute name users set in their config JSON.
Then update the Validate method. It returns three values: a list of required
dependency names, a list of optional dependency names, and an error.
type Config struct {
SensorNames []string `json:"sensor_names"`
MaxTemp float64 `json:"max_temp"`
PollInterval float64 `json:"poll_interval_secs"`
}
func (cfg *Config) Validate(path string) ([]string, []string, error) {
if len(cfg.SensorNames) == 0 {
return nil, nil, fmt.Errorf("sensor_names is required")
}
if cfg.MaxTemp == 0 {
return nil, nil, fmt.Errorf("max_temp is required")
}
// 1. Declare: return all sensor names as required dependencies
return cfg.SensorNames, nil, nil
}
The constructor receives the validated config and a dependencies map
containing the resources you declared in the validation method. Look up each
dependency by name, store it on your struct/instance, and start the background
monitoring loop.
Update validate_config, new, and reconfigure:
@classmethod
def validate_config(
cls, config: ComponentConfig
) -> Tuple[Sequence[str], Sequence[str]]:
fields = config.attributes.fields
if "sensor_names" not in fields:
raise Exception("sensor_names is required")
if "max_temp" not in fields:
raise Exception("max_temp is required")
sensor_names = [
v.string_value
for v in fields["sensor_names"].list_value.values
]
# 1. Declare: return sensor names as required dependencies
return sensor_names, []
@classmethod
def new(cls, config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
instance = cls(config.name)
instance.alerts = []
instance._monitor_task = None
instance._stop_event = asyncio.Event()
instance.reconfigure(config, dependencies)
return instance
def reconfigure(self, config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase]) -> None:
# Stop any existing monitor loop
if self._monitor_task is not None:
self._stop_event.set()
self._monitor_task = None
fields = config.attributes.fields
self.sensor_names = [
v.string_value
for v in fields["sensor_names"].list_value.values
]
self.max_temp = fields["max_temp"].number_value
self.poll_interval = (
fields["poll_interval_secs"].number_value
if "poll_interval_secs" in fields
else 10.0
)
# 2. Resolve: find each sensor in the dependencies map
self.sensors = {}
for name, dep in dependencies.items():
if name.name in self.sensor_names:
self.sensors[name.name] = dep
# Start the monitor loop
self._stop_event = asyncio.Event()
self._monitor_task = asyncio.create_task(self._monitor_loop())
Update the struct and constructor. resource.Named provides the Name()
method that viam-server requires. resource.NativeConfig converts the raw
config into your typed struct. sensor.FromProvider looks up a sensor
dependency by name from the dependencies map.
type TempAlert struct {
resource.Named
logger logging.Logger
cfg *Config
sensors map[string]sensor.Sensor
mu sync.Mutex
alerts []Alert
cancelFn func()
}
type Alert struct {
Sensor string `json:"sensor"`
Value float64 `json:"value"`
Threshold float64 `json:"threshold"`
Time string `json:"time"`
}
func newTempAlert(
ctx context.Context,
deps resource.Dependencies,
conf resource.Config,
logger logging.Logger,
) (resource.Resource, error) {
cfg, err := resource.NativeConfig[*Config](conf)
if err != nil {
return nil, err
}
// 2. Resolve: find each sensor in the dependencies map
sensors := make(map[string]sensor.Sensor)
for _, name := range cfg.SensorNames {
s, err := sensor.FromProvider(deps, name)
if err != nil {
return nil, fmt.Errorf("sensor %q not found: %w", name, err)
}
sensors[name] = s
}
monitorCtx, cancelFn := context.WithCancel(context.Background())
svc := &TempAlert{
Named: conf.ResourceName().AsNamed(),
logger: logger,
cfg: cfg,
sensors: sensors,
alerts: []Alert{},
cancelFn: cancelFn,
}
// Start background monitor loop
go svc.monitorLoop(monitorCtx)
return svc, nil
}
The monitor loop polls sensors at a fixed interval and checks readings against thresholds. When a reading exceeds the threshold, it creates an alert.
async def _monitor_loop(self):
while not self._stop_event.is_set():
for name, s in self.sensors.items():
try:
# 3. Use: call methods on dependencies
readings = await s.get_readings()
temp = readings.get("temperature")
if temp is not None and temp > self.max_temp:
alert = {
"sensor": name,
"value": temp,
"threshold": self.max_temp,
"time": datetime.now().isoformat(),
}
self.alerts.append(alert)
self.logger.warning(
"Alert: %s reported %.1f (threshold: %.1f)",
name, temp, self.max_temp,
)
except Exception as e:
self.logger.error("Failed to read %s: %s", name, e)
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self.poll_interval,
)
break # Stop event was set
except asyncio.TimeoutError:
pass # Continue polling
func (s *TempAlert) monitorLoop(ctx context.Context) {
interval := time.Duration(s.cfg.PollInterval) * time.Second
if interval == 0 {
interval = 10 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkSensors(ctx)
}
}
}
func (s *TempAlert) checkSensors(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()
for name, sens := range s.sensors {
// 3. Use: call methods on dependencies
readings, err := sens.Readings(ctx, nil)
if err != nil {
s.logger.CErrorw(ctx, "failed to read sensor", "sensor", name, "error", err)
continue
}
temp, ok := readings["temperature"].(float64)
if !ok {
continue
}
if temp > s.cfg.MaxTemp {
alert := Alert{
Sensor: name,
Value: temp,
Threshold: s.cfg.MaxTemp,
Time: time.Now().Format(time.RFC3339),
}
s.alerts = append(s.alerts, alert)
s.logger.CWarnw(ctx, "alert triggered",
"sensor", name, "value", temp, "threshold", s.cfg.MaxTemp)
}
}
}
The Go example holds the mutex for the entire checkSensors call, including sensor reads that may block on network IO.
This keeps the code simple and familiar (defer s.mu.Unlock()), but it means DoCommand calls will block while sensors are being read.
In production, you would copy s.sensors under the lock, release it, read sensors without the lock, then re-lock to append alerts.
DoCommand is the interface your application code uses to interact with the
service. Define a command vocabulary that makes sense for your module.
async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
cmd = command.get("command", "")
if cmd == "get_alerts":
return {"alerts": self.alerts}
if cmd == "get_alert_count":
return {"count": len(self.alerts)}
if cmd == "acknowledge":
self.alerts.clear()
return {"status": "ok"}
if cmd == "set_threshold":
self.max_temp = command["max_temp"]
return {"status": "ok", "max_temp": self.max_temp}
raise Exception(f"Unknown command: {cmd}")
func (s *TempAlert) DoCommand(
ctx context.Context,
cmd map[string]interface{},
) (map[string]interface{}, error) {
command, _ := cmd["command"].(string)
switch command {
case "get_alerts":
s.mu.Lock()
defer s.mu.Unlock()
// Convert alerts to interface slice for serialization
alertList := make([]interface{}, len(s.alerts))
for i, a := range s.alerts {
alertList[i] = map[string]interface{}{
"sensor": a.Sensor,
"value": a.Value,
"threshold": a.Threshold,
"time": a.Time,
}
}
return map[string]interface{}{"alerts": alertList}, nil
case "get_alert_count":
s.mu.Lock()
defer s.mu.Unlock()
return map[string]interface{}{"count": len(s.alerts)}, nil
case "acknowledge":
s.mu.Lock()
defer s.mu.Unlock()
s.alerts = s.alerts[:0]
return map[string]interface{}{"status": "ok"}, nil
case "set_threshold":
newMax, ok := cmd["max_temp"].(float64)
if !ok {
return nil, fmt.Errorf("max_temp must be a number")
}
s.mu.Lock()
s.cfg.MaxTemp = newMax
s.mu.Unlock()
return map[string]interface{}{"status": "ok", "max_temp": newMax}, nil
default:
return nil, fmt.Errorf("unknown command: %s", command)
}
}
When viam-server stops the module or reconfigures it, your background loop
must stop cleanly. Without this, goroutines or async tasks leak.
async def close(self):
self._stop_event.set()
if self._monitor_task is not None:
await self._monitor_task
self._monitor_task = None
self.logger.info("TempAlert monitor stopped")
func (s *TempAlert) Close(ctx context.Context) error {
s.cancelFn()
s.logger.CInfof(ctx, "TempAlert monitor stopped")
return nil
}
In Go, the Reconfigure method should also stop the old loop and start a new
one:
func (s *TempAlert) Reconfigure(
ctx context.Context,
deps resource.Dependencies,
conf resource.Config,
) error {
// Stop the old loop
s.cancelFn()
cfg, err := resource.NativeConfig[*Config](conf)
if err != nil {
return err
}
sensors := make(map[string]sensor.Sensor)
for _, name := range cfg.SensorNames {
sens, err := sensor.FromProvider(deps, name)
if err != nil {
return fmt.Errorf("sensor %q not found: %w", name, err)
}
sensors[name] = sens
}
monitorCtx, cancelFn := context.WithCancel(context.Background())
s.mu.Lock()
s.cfg = cfg
s.sensors = sensors
s.cancelFn = cancelFn
s.mu.Unlock()
go s.monitorLoop(monitorCtx)
return nil
}
Configure the service on your machine:
In the Viam app, navigate to your machine’s CONFIGURE tab.
Ensure you have at least one sensor configured (this is the resource your logic module will monitor).
Click +, select Advanced, then Local module.
Set the Executable path to your module binary or script.
Click Create.
Click +, select Advanced, then Local service.
Select your module, set the type to generic and the model to your model
name, and configure attributes:
{
"sensor_names": ["my-temp-sensor"],
"max_temp": 30.0,
"poll_interval_secs": 5
}
Click Save.
Test with DoCommand:
On the CONFIGURE tab, expand your service’s card and find the DoCommand section. Send a command:
{ "command": "get_alerts" }
You should see a response with any alerts that have been triggered.
Get a ready-to-run code sample:
The CONNECT tab on your machine’s page in the Viam app provides generated
code samples in Python and Go that connect to your machine and access all
configured resources. Use this as a starting point for application code that
sends DoCommand requests to your service.
Rebuild and redeploy during development:
viam-server does not automatically detect changes to your module’s source
files or binary. After making changes, use the CLI to rebuild and redeploy:
# Build locally, transfer to machine, and restart the module
viam module reload-local --part-id <machine-part-id>
# Restart the module without rebuilding (e.g., after editing Python source)
viam module restart --part-id <machine-part-id>
Test the alert flow:
max_temp to a value below the current temperature so alerts trigger.{"command": "get_alerts"}.{"command": "acknowledge"} to clear them.viam module generate.DoCommand with get_alerts, acknowledge, and set_threshold
commands.Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!