Viam Documentation
Viam integrates with hardware and software on any device. Use AI, machine learning, and more to make any machine smarter—for one machine to thousands.
Program any device
To get started, install Viam on any device and create a configuration that describes connected hardware as components. Then you can control your device and any attached physical hardware securely from anywhere in the world. Or from local networks.
async def moveInSquare(base):
for _ in range(4):
# Move forward 500mm at 500mm/s
await base.move_straight(velocity=500, distance=500)
# Spin 90 degrees at 100 degrees/s
await base.spin(velocity=100, angle=90)
func moveInSquare(ctx context.Context, base base.Base, logger logging.Logger) {
for i := 0; i < 4; i++ {
// Move forward 500mm at 500mm/s
base.MoveStraight(ctx, 600, 500.0, nil)
// Spin 90 degrees at 100 degrees/s
base.Spin(ctx, 90, 100.0, nil)
}
}
async function moveInSquare(baseClient: VIAM.BaseClient) {
for (let i = 0; i < 4; i++) {
// Move forward 500mm at 500mm/s
await baseClient.moveStraight(500, 500);
// Spin 90 degrees at 100 degrees/s
await baseClient.spin(90, 100);
}
}
Future<void> moveSquare() async {
for (var i=0; i<4; i++) {
// Move forward 500mm at 500mm/s
await base.moveStraight(500, 500);
// Spins the rover 90 degrees at 100 degrees/s
await base.spin(90, 100);
}
}
void move_in_square(std::shared_ptr<viam::sdk::Base> base) {
for (int i = 0; i < 4; ++i) {
// Move forward 500mm at 500mm/s
base->move_straight(500, 500);
// Spins the rover 90 degrees at 100 degrees/s
base->spin(90, 100);
}
}
You can use any robotic base with Viam. Configure it as a base component. Then you can drive it using the base API.
async def spin_motor(motor):
# Turn the motor at 35% power forwards
await motor.set_power(power=0.35)
# Let the motor spin for 3 seconds
time.sleep(3)
# Stop the motor
await motor.stop()
func spinMotor(ctx context.Context, motor motor.Motor, logger logging.Logger) {
// Turn the motor at 35% power forwards
err = motor.SetPower(context.Background(), 0.35, nil)
// Let the motor spin for 3 seconds
time.Sleep(3 * time.Second)
// Stop the motor
err = motor.Stop(context.Background(), nil)
}
async function spinMotor(motorClient: VIAM.MotorClient) {
// Turn the motor at 35% power forwards
await motorClient.setPower(0.35);
// Let the motor spin for 3 seconds
const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
await sleep(3000);
// Stop the motor
await motorClient.stop();
}
Future<void> spinMotor() async {
// Turn the motor at 35% power forwards
await motorClient.setPower(0.35);
// Let the motor spin for 3 seconds
await Future.delayed(Duration(seconds: 3));
// Stop the motor
await motorClient.stop();
}
void spin_motor(std::shared_ptr<viam::sdk::Motor> motor) {
// Turn the motor at 35% power forwards
motor->set_power(0.35);
// Let the motor spin for 3 seconds
sleep(3);
// Stop the motor
motor->stop();
}
You can use any motor with Viam. Configure it as a motor component. Then you can operate it using the motor API.
# Get the readings provided by the sensor.
co_2_monitor = Sensor.from_robot(machine, "co2-monitor")
co_2_monitor_return_value = await co_2_monitor.get_readings()
print(f"co2-monitor get_readings return value: {co_2_monitor_return_value}")
// Get the readings provided by the sensor.
co2Monitor, err := sensor.FromRobot(machine, "co2-monitor")
co2MonitorReturnValue, err := co2Monitor.Readings(
context.Background(), map[string]interface{}{})
logger.Infof("co2-monitor return value: %+v", co2MonitorReturnValue)
// Get the readings provided by the sensor.
const co2MonitorClient = new VIAM.SensorClient(machine, "co2-monitor");
const co2MonitorReturnValue = await co2MonitorClient.getReadings();
console.log("co2-monitor return value:", co2MonitorReturnValue);
// Get the readings provided by the sensor.
final co2Monitor = Sensor.fromRobot(client, "co2-monitor");
var readings = await co2Monitor.readings();
print(readings);
// Get the readings provided by the sensor.
auto co2monitor = machine->resource_by_name<Sensor>("co2-monitor");
auto co2monitor_get_readings_return_value = co2monitor->get_readings();
std::cout << "co2-monitor get_readings return value " << co2monitor_get_readings_return_value << "\n";
You can use any physical sensor or anything else that provides measurements with Viam. Configure it as a sensor component. Then you can get sensor readings using the sensor API.
# Command a joint position move: move the forearm of the arm slightly up
cmd_joint_positions = JointPositions(values=[0, 0, -30.0, 0, 0, 0])
await my_arm_component.move_to_joint_positions(
positions=cmd_joint_positions)
# Generate a simple pose move +100mm in the +Z direction of the arm
cmd_arm_pose = await my_arm_component.get_end_position()
cmd_arm_pose.z += 100.0
await my_arm_component.move_to_position(pose=cmd_arm_pose)
// Command a joint position move: move the forearm of the arm slightly up
cmdJointPositions := &armapi.JointPositions{Values: []float64{0.0, 0.0, -30.0, 0.0, 0.0, 0.0}}
err = myArmComponent.MoveToJointPositions(context.Background(), cmdJointPositions, nil)
// Generate a simple pose move +100mm in the +Z direction of the arm
currentArmPose, err := myArmComponent.EndPosition(context.Background(), nil)
adjustedArmPoint := currentArmPose.Point()
adjustedArmPoint.Z += 100.0
cmdArmPose := spatialmath.NewPose(adjustedArmPoint, currentArmPose.Orientation())
err = myArmComponent.MoveToPosition(context.Background(), cmdArmPose, referenceframe.NewEmptyWorldState(), nil)
You can use any robotic arm with Viam. Configure it as an arm component. Then you can move it using the arm API.
yo viam-module
? Create module structure within current directory? If no, will create a new directory with current directory matching the module name
you select No
? Your model triplet in the format namespace:family:modelname acme:rovers:base
? The language your module will be written in, must match Viam SDK language selected (python currently supported) python
? The API triplet this module uses (for example: rdk:component:motor). Expectation is that the second element is 'component' or 'service'.
rdk:component:base
? Is this a viam-sdk built-in API? Yes
Will create module scaffolding for module - base
API - rdk:component:base
Model - acme:rovers:base
create base-base/run.sh
create base-base/requirements.txt
create base-base/src/__main__.py
create base-base/src/__init__.py
create base-base/src/base.py
create base-base/README.md
Using the Viam Registry you can create resources for additional hardware types or models and then deploy them to your machines. There are also a variety of community-supplied resources you can use.
Make your devices better and smarter
Pick and choose from additional services. Make your devices understand their environment, interact with it, collect data, and more:
# Get image from camera stream on construction site
cam = Camera.from_robot(machine, "construction-site-cam")
img = await cam.get_image()
# Use machine learning model to gather information from the image
hardhat_detector = VisionClient.from_robot(machine, "hardhat_detector")
detections = await hardhat_detector.get_detections(img)
# Check whether a person is detected not wearing a hardhat
for d in detections:
if d.confidence > 0.8 and d.class_name == "NO-Hardhat":
print("Violation detected.")
// Get image from camera stream on construction site
myCamera, err := camera.FromRobot(machine, "construction-site-cam")
camStream, err := myCamera.Stream(context.Background())
img, release, err := camStream.Next(context.Background())
defer release()
// Use machine learning model to gather information from the image
visService, err := vision.FromRobot(machine, "hardhat_detector")
detections, err := visService.Detections(context.Background(), img, nil)
// Check whether a person is detected not wearing a hardhat
for i := 0; i < len(detections); i++ {
if (detection[i].confidence > 0.8) && (detection[i].class_name == "NO-Hardhat") {
logger.Info("Violation detected.")
}
}
Computer vision enables your machine to use connected cameras to interpret the world around it. With inferences about a machine’s surroundings, you can program machines to act based on this input.
# Tag data from the my_camera component
my_filter = create_filter(component_name="my_camera")
tags = ["frontview", "trainingdata"]
res = await data_client.add_tags_to_binary_data_by_filter(tags, my_filter)
# Query sensor data by filter
my_data = []
my_filter = create_filter(
component_name="sensor-1",
start_time=Timestamp('2024-10-01 10:00:00', tz='US/Pacific'),
end_time=Timestamp('2024-10-12 18:00:00', tz='US/Pacific')
)
tabular_data, count, last = await data_client.tabular_data_by_filter(
my_filter, last=None)
Sync sensor data, images, and any other binary or timeseries data from all your machines to the cloud. There, you can query and visualize it.
Intermittent internet connectivity? Your data will sync whenever internet is available.
# Add a table obstacle to a WorldState
table_origin = Pose(x=-202.5, y=-546.5, z=-19.0)
table_dimensions = Vector3(x=635.0, y=1271.0, z=38.0)
table_object = Geometry(center=table_origin,
box=RectangularPrism(dims_mm=table_dimensions))
obstacles_in_frame = GeometriesInFrame(reference_frame="world",
geometries=[table_object])
world_state = WorldState(obstacles=[obstacles_in_frame])
# Destination pose to move to
dest_in_frame = PoseInFrame(
reference_frame="world",
pose=Pose(x=510.0, y=0.0, z=526.0, o_x=0.7, o_y=0.0, o_z=-0.7, theta=0.0))
# Move arm to destination pose
motion_service = MotionClient.from_robot(robot, "builtin")
await motion_service.move(
component_name=Arm.get_resource_name("myArm"),
destination=dest_in_frame, world_state=world_state)
// Add a table obstacle to a WorldState
obstacles := make([]spatialmath.Geometry, 0)
tableOrigin := spatialmath.NewPose(
r3.Vector{X: 0.0, Y: 0.0, Z: -10.0},
&spatialmath.OrientationVectorDegrees{OX: 0.0, OY: 0.0, OZ: 1.0, Theta: 0.0},
)
tableDimensions := r3.Vector{X: 2000.0, Y: 2000.0, Z: 20.0}
tableObj, err := spatialmath.NewBox(tableOrigin, tableDimensions, "table")
obstacles = append(obstacles, tableObj)
obstaclesInFrame := referenceframe.NewGeometriesInFrame(referenceframe.World, obstacles)
worldState, err := referenceframe.NewWorldState([]*referenceframe.GeometriesInFrame{obstaclesInFrame}, nil)
// Destination pose to move to
destinationPose := spatialmath.NewPose(
r3.Vector{X: 510.0, Y: 0.0, Z: 526.0},
&spatialmath.OrientationVectorDegrees{OX: 0.7071, OY: 0.0, OZ: -0.7071, Theta: 0.0},
)
destPoseInFrame := referenceframe.NewPoseInFrame(
referenceframe.World, destinationPose)
// Move arm to destination pose
motionService, err := motion.FromRobot(robot, "builtin")
_, err = motionService.Move(context.Background(), arm.Named("myArm"), destPoseInFrame, worldState, nil, nil)
The motion service enables your machine to plan and move relative to itself, other machines, and the world.
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Create a new waypoint at the specified latitude and longitude
location = GeoPoint(latitude=40.76275, longitude=-73.96)
# Add waypoint to the service's data storage
await my_nav.add_waypoint(point=location)
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Set the service to operate in waypoint mode and begin navigation
await my_nav.set_mode(Mode.ValueType.MODE_WAYPOINT)
myNav, err := navigation.FromRobot(robot, "my_nav_service")
// Create a new waypoint at the specified latitude and longitude
location = geo.NewPoint(40.76275, -73.96)
// Add waypoint to the service's data storage
err := myNav.AddWaypoint(context.Background(), location, nil)
myNav, err := navigation.FromRobot(robot, "my_nav_service")
// Set the service to operate in waypoint mode and begin navigation
mode, err := myNav.SetMode(context.Background(), Mode.MODE_WAYPOINT, nil)
Use the navigation service to autonomously navigate a machine to defined waypoints.
npm install -g generator-viam-module
yo viam-module
? Create module structure within current directory? If no, will create a new directory with current directory matching the module name
you select No
? Your model triplet in the format namespace:family:modelname acme:services:slam
? The language your module will be written in, must match Viam SDK language selected (python currently supported) python
? The API triplet this module uses (for example: rdk:component:motor). Expectation is that the second element is 'component' or 'service'.
rdk:services:slam
? Is this a viam-sdk built-in API? No
? Is this an new API you will define now? No
Will create module scaffolding for module - slam
API - rdk:services:slam
Model - acme:services:slam
create slam-slam/run.sh
create slam-slam/requirements.txt
create slam-slam/src/__main__.py
create slam-slam/src/slam/__init__.py
create slam-slam/src/slam/slam.py
create slam-slam/README.md
Using the Viam Registry you can turn your own custom business logic into modules. You can then deploy your modules to your machines.
Go from one machine to thousands
When you connect machines to the cloud you get fleet management tools that let you scale. Go from one prototype to thousands of machines you can manage and operate from one place using the Viam Cloud.
// Reusable configuration for using a software package
{
"services": [
{
"name": "speech-1",
"namespace": "viam-labs",
"type": "speech",
"model": "viam-labs:speech:speechio"
}
],
"modules": [
{
"type": "registry",
"name": "viam-labs_speech",
"module_id": "viam-labs:speech",
// Specific version to deploy
"version": "0.5.2"
}
]
}
Manage hardware and software for multiple machines using a built-in tool called fragments. You can make changes to some or all of your machines in one go.
# Create configuration for provisioning machines with a fragment
echo "{
"manufacturer": "Company",
"model": "SmartRover",
"fragment_id": "11d1059b-eaed-4ad8-9fd8-d60ad7386aa2"
}" >> viam-provisioning.json
# Get and run the script to install viam on a board.
wget https://storage.googleapis.com/packages.viam.com/apps/viam-agent/preinstall.sh
chmod 755 preinstall.sh
sudo ./preinstall.sh
Provisioning allows you to complete part of the machine setup during the manufacturing process. The rest of the first-time setup happens once the machine is taken into operation. This way, machines automatically get the latest updates.
# Get all machines in a location
machines = await cloud.list_robots(location_id="abcde1fghi")
for m in machines:
# Connect and get status information or latest logs
machine_parts = await cloud.get_robot_parts(m.id)
main_part = next(filter(lambda part: part.main_part, machine_parts), None)
try:
# Get status for machine
machine = await connect(main_part.fqdn)
status = await machine.get_machine_status()
except ConnectionError:
# If no connection can be made, get last logs
logs = await cloud.get_robot_part_logs(
robot_part_id=main_part.id, num_log_entries=5)
Get status information and logs from all your deployed machines.
# Start a training job to create a classification model based on the dataset
job_id = await ml_training_client.submit_training_job(
org_id="abbc1c1c-d2e3-5f67-ab8c-de912345f678",
dataset_id="12ab3cd4e56f7abc89de1fa2",
model_name="recognize_gestures",
model_version="1",
model_type=ModelType.MODEL_TYPE_MULTI_LABEL_CLASSIFICATION,
tags=["follow", "stop"]
)
# Get status information for training job
job_metadata = await ml_training_client.get_training_job(
id=job_id)
Build machine learning models based on your machines’ data. You can pick from different training algorithms or create your own.
# Create a new machine
new_machine_id = await cloud.new_robot(
name="new-machine", location_id="abcde1fghi")
# Get organization associated with authenticated user / API key
org_list = await cloud.list_organizations()
# Create a new API key with owner access for the new machine
auth = APIKeyAuthorization(
role="owner",
resource_type="robot",
resource_id=new_machine_id
)
api_key, api_key_id = await cloud.create_key(
org_list[0].id, [auth], "key_for_new_machine")
Viam allows you to organize and manage any number of machines. When collaborating with others, you can assign permissions using Role-Based Access Control (RBAC).