Run control logic on a machine

This guide shows you how to write a module with control logic for a machine:

  1. Create a module with a template for the control logic
  2. Program the control logic using the DoCommand method
  3. Package the module and upload it to Viam
  4. Deploy the module to individual machines using viam-server
  5. Run control logic on the module automatically with one or more jobs

For microcontrollers, see Micro-RDK modules and Over-the-air firmware updates instead.

Prerequisites

You must have one machine running viam-server.

If your control logic depends on any hardware or software resources to function, you must configure those hardware and software resources.

Create a module with a generic component template

Install the Viam CLI to generate the template you will use to write your control logic:

  1. Install the CLI.

    You must have the Viam CLI installed to generate and upload modules:

    To download the Viam CLI on a macOS computer, install brew and run the following commands:

       brew tap viamrobotics/brews
       brew install viam
       

    To download the Viam CLI on a Linux computer with the aarch64 architecture, run the following commands:

       sudo curl -o /usr/local/bin/viam https://storage.googleapis.com/packages.viam.com/apps/viam-cli/viam-cli-stable-linux-arm64
       sudo chmod a+rx /usr/local/bin/viam
       

    To download the Viam CLI on a Linux computer with the amd64 (Intel x86_64) architecture, run the following commands:

       sudo curl -o /usr/local/bin/viam https://storage.googleapis.com/packages.viam.com/apps/viam-cli/viam-cli-stable-linux-amd64
       sudo chmod a+rx /usr/local/bin/viam
       

    You can also install the Viam CLI using brew on Linux amd64 (Intel x86_64):

       brew tap viamrobotics/brews
       brew install viam
       

    Download the binary and run it directly to use the Viam CLI on a Windows computer.

    If you have Go installed, you can build the Viam CLI directly from source using the go install command:

       go install go.viam.com/rdk/cli/viam@latest
       

    To confirm viam is installed and ready to use, issue the viam command from your terminal. If you see help instructions, everything is correctly installed. If you do not see help instructions, add your local go/bin/* directory to your PATH variable. If you use bash as your shell, you can use the following command:

       echo 'export PATH="$HOME/go/bin:$PATH"' >> ~/.bashrc
       

    For more information see install the Viam CLI.

  2. Run the module generate command in your terminal:

    viam module generate --resource-subtype=generic-component
    
  3. Follow the prompts, selecting the following options:

    • Module name: Your choice, for example control-logic

    • Language: Your choice

    • Visibility: Private

    • Namespace/Organization ID: Navigate to your organization settings through the menu in upper right corner of the page. Find the Public namespace and copy that string. In the example snippets below, the namespace is naomi.

    • Resource to be added to the module: Generic Component.

      DoCommand is generally used to implement control logic, as you can pass commands as arbitrary JSON objects, such as {“action”: “start”}. You can use the DoCommand method to implement everything that doesn’t fit into other API methods. For simplicity, this guide uses the generic component which only supports the DoCommand method. However the DoCommand method is supported on all resource types, so you can choose a different resource type to add your control logic to. For example, for logic controlling a camera, you can use the camera component.

    • Model name: Your choice, for example control-logic

    • Enable cloud build: Choose Yes if you are using GitHub or want to use cloud build.

    • Register module: Yes

  4. Press the Enter key and the generator will create a folder for your control logic component.

Add control logic to your module

Open the file src/models/control_logic.py to add your control logic to it.

The following example shows how you might implement a counter that starts counting when you send a start command and stops when it receives a stop command.

1

Set up instance parameters

When your new model gets added to your machine, its reconfigure() method gets called. You can use it to store any instance variables.

The following example code initializes two instance parameters counter and running.

    def reconfigure(
        self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ):
        self.counter = 0
        self.running = False
        return super().reconfigure(config, dependencies)

The reconfigure method gets called whenever the control logic module starts or when a configuration change occurs for the resource itself.

If this is a problem, consider writing state to a file on disk and adding logic to handle subsequent calls to the reconfigure method gracefully.

2

Write the control logic

To add the control logic, use the DoCommand() method. The method accepts arbitrary JSON objects as commands.

The following code checks the command object and for the start command it sets the running parameter to True and for the stop command to False. A third command, on_loop, results in the on_loop() method being called, but only if running is True.

The on_loop() method increments the counter.

    async def on_loop(self):
        try:
            self.logger.info("Executing control logic")
            self.counter += 1
            self.logger.info(f"Counter: {self.counter}")

        except Exception as err:
            self.logger.error(err)

    async def do_command(
        self,
        command: Mapping[str, ValueTypes],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> Mapping[str, ValueTypes]:
        result = {key: False for key in command.keys()}
        for name, args in command.items():
            if name == "action" and args == "start":
                self.running = True
                result[name] = True
            if name == "action" and args == "stop":
                self.running = False
                result[name] = True
            if name == "action" and args == "on_loop":
                if self.running:
                    await self.on_loop()
                result[name] = True
        result["counter"] = self.counter
        return result
Click to view the entire control logic code

This is the code for src/models/control_logic.py:

from typing import (Any, ClassVar, Dict, Final, List, Mapping, Optional,
                    Sequence, Tuple)

from typing_extensions import Self
from viam.components.generic import *
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import Geometry, ResourceName
from viam.resource.base import ResourceBase
from viam.resource.easy_resource import EasyResource
from viam.resource.types import Model, ModelFamily
from viam.utils import ValueTypes


class ControlLogic(Generic, EasyResource):
    # To enable debug-level logging, either run viam-server with the --debug option,
    # or configure your resource/machine to display debug logs.
    MODEL: ClassVar[Model] = Model(
        ModelFamily("<namespace>", "control-logic"), "control-logic"
    )

    @classmethod
    def new(
        cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ) -> Self:
        return super().new(config, dependencies)

    @classmethod
    def validate_config(
        cls, config: ComponentConfig
    ) -> Tuple[Sequence[str], Sequence[str]]:
        return [], []

    def reconfigure(
        self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ):
        """This method allows you to dynamically update your service when it receives a new `config` object.

        Args:
            config (ComponentConfig): The new configuration
            dependencies (Mapping[ResourceName, ResourceBase]): Any dependencies (both required and optional)
        """
        self.counter = 0
        self.running = False
        return super().reconfigure(config, dependencies)

    async def on_loop(self):
        try:
            self.logger.info("Executing control logic")
            self.counter += 1
            self.logger.info(f"Counter: {self.counter}")

        except Exception as err:
            self.logger.error(err)

    async def do_command(
        self,
        command: Mapping[str, ValueTypes],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> Mapping[str, ValueTypes]:
        result = {key: False for key in command.keys()}
        for name, args in command.items():
            if name == "action" and args == "start":
                self.running = True
                result[name] = True
            if name == "action" and args == "stop":
                self.running = False
                result[name] = True
            if name == "action" and args == "on_loop":
                if self.running:
                    await self.on_loop()
                result[name] = True
        result["counter"] = self.counter
        return result

    async def get_geometries(
        self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None
    ) -> Sequence[Geometry]:
        self.logger.error("`get_geometries` is not implemented")
        raise NotImplementedError()

For a complete tutorial, see Tutorial: Desk Safari. For more examples, check the Viam registry

Use other machine resources

Any resources that you wish to access from your control logic need to be identified and instantiated. To keep your code loosely coupled, we recommend passing the resource names in the configuration attributes of the control logic. We must modify the validate_config method to ensure all required values are passed in correctly and then instantiate the resource in the reconfigure method.

Let’s assume you have a board, and you’d like to pull a pin high when the start command is received and low when the stop command is received.

1

Pass resources in configuration.

The validate_config method serves two purposes:

  • To ensure the expected fields are in the config. The validate_config method is called whenever the module is started or a configuration change occurs.
  • To return a list of the names of all the required dependencies. viam-server waits until all returned dependencies are available before starting this component.
    @classmethod
    def validate_config(
        cls, config: ComponentConfig
    ) -> Tuple[Sequence[str], Sequence[str]]:
        req_deps = []
        fields = config.attributes.fields
        if "board_name" not in fields:
            raise Exception("missing required board_name attribute")
        elif not fields["board_name"].HasField("string_value"):
            raise Exception("board_name must be a string")
        board_name = fields["board_name"].string_value
        if not board_name:
            raise ValueError("board_name cannot be empty")
        req_deps.append(board_name)
        return req_deps, []
2

Access the resources.

viam-server passes the required dependencies when the control logic resource is reconfiguring. From these dependencies you can get the board and store it in an instance variable.

    def reconfigure(
        self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ):
        self.board_name = config.attributes.fields["board_name"].string_value
        board_resource_name = Board.get_resource_name(self.board_name)
        board_resource = dependencies[board_resource_name]
        self.board = cast(Board, board_resource)
        self.counter = 0
        self.running = False
        return super().reconfigure(config, dependencies)

Add the following imports at the top of src/models/control_logic.py:

from typing import cast
from viam.components.board import Board
3

Use the resources.

Update your logic in the do_command method to use the board:

    async def do_command(
        self,
        command: Mapping[str, ValueTypes],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> Mapping[str, ValueTypes]:
        result = {key: False for key in command.keys()}
        for name, args in command.items():
            if name == "action" and args == "start":
                self.running = True
                pin = await self.board.gpio_pin_by_name(name="13")
                await pin.set(high=True)
                result[name] = True
            if name == "action" and args == "stop":
                self.running = False
                pin = await self.board.gpio_pin_by_name(name="13")
                await pin.set(high=False)
                result[name] = True
            if name == "action" and args == "on_loop":
                if self.running:
                    await self.on_loop()
                result[name] = True
        result["counter"] = self.counter
        return result
Click to view the entire control logic code

This is the code for src/models/control_logic.py:

from typing import (Any, ClassVar, Dict, Final, List, Mapping, Optional,
                    Sequence, Tuple, cast)

from typing_extensions import Self
from viam.components.generic import *
from viam.components.board import Board
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import Geometry, ResourceName
from viam.resource.base import ResourceBase
from viam.resource.easy_resource import EasyResource
from viam.resource.types import Model, ModelFamily
from viam.utils import ValueTypes


class ControlLogic(Generic, EasyResource):
    # To enable debug-level logging, either run viam-server with the --debug option,
    # or configure your resource/machine to display debug logs.
    MODEL: ClassVar[Model] = Model(
        ModelFamily("naomi", "test-control-logic"), "control-logic"
    )

    @classmethod
    def new(
        cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ) -> Self:
        """This method creates a new instance of this Generic component.
        The default implementation sets the name from the `config` parameter and then calls `reconfigure`.

        Args:
            config (ComponentConfig): The configuration for this resource
            dependencies (Mapping[ResourceName, ResourceBase]): The dependencies (both required and optional)

        Returns:
            Self: The resource
        """
        return super().new(config, dependencies)

    @classmethod
    def validate_config(
        cls, config: ComponentConfig
    ) -> Tuple[Sequence[str], Sequence[str]]:
        req_deps = []
        fields = config.attributes.fields
        if "board_name" not in fields:
            raise Exception("missing required board_name attribute")
        elif not fields["board_name"].HasField("string_value"):
            raise Exception("board_name must be a string")
        board_name = fields["board_name"].string_value
        if not board_name:
            raise ValueError("board_name cannot be empty")
        req_deps.append(board_name)
        return req_deps, []

    def reconfigure(
        self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
    ):
        self.board_name = config.attributes.fields["board_name"].string_value
        board_resource_name = Board.get_resource_name(self.board_name)
        board_resource = dependencies[board_resource_name]
        self.board = cast(Board, board_resource)
        self.counter = 0
        self.running = False
        return super().reconfigure(config, dependencies)

    async def on_loop(self):
        try:
            self.logger.info("Executing control logic")
            self.counter += 1
            self.logger.info(f"Counter: {self.counter}")

        except Exception as err:
            self.logger.error(err)

    async def do_command(
        self,
        command: Mapping[str, ValueTypes],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> Mapping[str, ValueTypes]:
        result = {key: False for key in command.keys()}
        for name, args in command.items():
            if name == "action" and args == "start":
                self.running = True
                pin = await self.board.gpio_pin_by_name(name="13")
                await pin.set(high=True)
                result[name] = True
            if name == "action" and args == "stop":
                self.running = False
                pin = await self.board.gpio_pin_by_name(name="13")
                await pin.set(high=False)
                result[name] = True
            if name == "action" and args == "on_loop":
                if self.running:
                    await self.on_loop()
                result[name] = True
        result["counter"] = self.counter
        return result

    async def get_geometries(
        self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None
    ) -> Sequence[Geometry]:
        self.logger.error("`get_geometries` is not implemented")
        raise NotImplementedError()

For more information, see Module dependencies.

Package the control logic

Once you have implemented your control logic, commit and push your changes to a GitHub repository.

Follow the steps in Upload your module using cloud build. When you create a release, your module will be built, packaged and pushed to the Viam Registry.

If you are not using GitHub or cloud build, see Upload your module and Update an existing module for more information on alternatives.

Deploy the control logic

  1. Navigate to the machine you want to deploy your control logic to.
  2. Go to the CONFIGURE tab.
  3. Click the + button.
  4. Click Component or service and select your control logic component.
  5. Click Add module.
  6. Add a Name and click Create. In the following, we use generic-1.
  7. If you added configuration attributes, configure your control logic component.
  8. Click Save.

Your control logic will now be added to your machine.

Test your control logic

You can use the DoCommand method from the web UI or from the Viam SDKs:

On the CONTROL or the CONFIGURE tab, use the DoCommand panel:

  1. Copy and paste one of the following command inputs:

    To set self.running to True, copy and paste the following command input:

    {
      "action": "start"
    }
    

    To run the control logic loop on_loop, copy and paste the following command input:

    {
      "action": "on_loop"
    }
    

    To set self.running to False, use the following command input:

    {
      "action": "stop"
    }
    
  2. Click Execute to call DoCommand() with the command input on your machine.

    The generic component in the test panel.

You can start and stop your control logic with the DoCommand() method from the Python SDK:

# Start your control logic
await control_logic.do_command({"action": "start"})

# Run your control loop
await control_logic.do_command({"action": "on_loop"})

# Stop your control logic
await control_logic.do_command({"action": "stop"})

Run control logic automatically with jobs

To run control logic, use a job which calls the DoCommand method periodically.

1

Start control logic at specific time each day

Click the + icon next to your machine part in the left-hand menu and select Job. You can use the default name, job-1, and click Create.

In the job panel, set the Schedule to Cron and enter 0 0 8 * * * which will run the job at 08:00 AM.

Then configure the job to use the control logic resource using the name you gave it when you deployed it.

Lastly, select the DoCommand Method and specify the Command { "action": "start" }.

Click Save.

2

Run control logic periodically

Configure another job:

  • Cron Schedule: 0 * * * * * (every minute)
  • Resource: generic-1
  • Method: DoCommand
  • Command: { "action": "on_loop" }
3

End control logic at specific time each day

Configure another job:

  • Cron Schedule: 0 0 17 * * * (at 05:00 PM)
  • Resource: generic-1
  • Method: DoCommand
  • Command: { "action": "stop" }

Now, check the LOGS tab; you’ll see the second job triggered every minute, but the counter will only increase once the first job to run the start command runs at 8 AM. For testing purposes, you can also send this command manually.