name: duroxide-orchestrations description: Writing durable workflows using Duroxide in Rust. Use when creating orchestrations, activities, workflows, or when the user mentions duroxide, durable functions, or workflow orchestration.
Duroxide Durable Workflow Development
Overview
Skills for developing durable workflows using Duroxide in Rust. Duroxide provides deterministic, replayable orchestrations with automatic failure recovery.
Core Concepts
- Activities: Idempotent operations that perform actual work (K8s calls, DB queries, HTTP requests)
- Orchestrations: Deterministic workflow logic that coordinates activities
- Continue-as-new: Pattern for long-running orchestrations to prevent unbounded history
- Sub-orchestrations: Reusable workflow compositions
- Detached orchestrations: Background workflows that run independently
Directory Structure
toygres-orchestrations/src/
├── orchestrations/ # Workflow definitions
│ └── my_orchestration.rs
├── activities/ # Atomic operations
│ ├── my_activity.rs
│ └── cms/ # Grouped by domain
│ └── my_cms_activity.rs
├── registry.rs # Central registration
├── types.rs # Orchestration I/O types
├── activity_types.rs # Activity I/O types
└── names.rs # Naming constants
Naming Convention
Follow the hierarchical namespace pattern in names.rs:
// Format: {crate}::{type}::{name}
pub mod orchestrations {
pub const MY_WORKFLOW: &str = "toygres-orchestrations::orchestration::my-workflow";
}
pub mod activities {
pub const MY_ACTIVITY: &str = "toygres-orchestrations::activity::my-activity";
}
Creating Activities
Activities must be idempotent - safe to retry without side effects.
// toygres-orchestrations/src/activities/my_activity.rs
use duroxide::ActivityContext;
use crate::activity_types::{MyInput, MyOutput};
/// Activity name for registration and scheduling
pub const NAME: &str = "toygres-orchestrations::activity::my-activity";
pub async fn activity(
ctx: ActivityContext,
input: MyInput,
) -> Result<MyOutput, String> {
ctx.trace_info(format!("Starting activity: {}", input.name));
// CRITICAL: Check idempotency first - has this already been done?
let already_done = check_if_done(&input).await?;
if already_done {
ctx.trace_info("Already completed, returning cached result");
return Ok(MyOutput { ... });
}
// Perform actual work
let result = do_work(&input).await
.map_err(|e| format!("Failed: {}", e))?;
ctx.trace_info("Activity completed successfully");
Ok(result)
}
Activity Registration
Add to registry.rs:
pub fn create_activity_registry() -> ActivityRegistry {
ActivityRegistry::builder()
.register_typed(
activities::my_activity::NAME,
activities::my_activity::activity,
)
.build()
}
Creating Orchestrations
Orchestrations coordinate activities with deterministic logic.
// toygres-orchestrations/src/orchestrations/my_orchestration.rs
use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
use std::time::Duration;
pub async fn my_orchestration(
ctx: OrchestrationContext,
input: MyInput,
) -> Result<MyOutput, String> {
ctx.trace_info(format!("Starting orchestration: {}", input.id));
// Schedule an activity (basic)
let result = ctx
.schedule_activity_typed::<ActivityInput, ActivityOutput>(
activities::my_activity::NAME,
&activity_input,
)
.into_activity_typed::<ActivityOutput>()
.await?;
Ok(MyOutput { ... })
}
Orchestration Registration
pub fn create_orchestration_registry() -> OrchestrationRegistry {
OrchestrationRegistry::builder()
.register_typed(
orchestrations::MY_WORKFLOW,
crate::orchestrations::my_orchestration::my_orchestration,
)
.build()
}
Scheduling Patterns
Basic Activity Scheduling
let result = ctx
.schedule_activity_typed::<Input, Output>(NAME, &input)
.into_activity_typed::<Output>()
.await?;
Activity with Retry and Backoff
let result = ctx
.schedule_activity_with_retry_typed::<Input, Output>(
NAME,
&input,
RetryPolicy::new(5) // Max 5 retries
.with_backoff(BackoffStrategy::Exponential {
base: Duration::from_secs(2),
multiplier: 2.0,
max: Duration::from_secs(30),
})
.with_timeout(Duration::from_secs(60)),
)
.await?;
Backoff Strategies
// Fixed delay between retries
BackoffStrategy::Fixed {
delay: Duration::from_secs(5)
}
// Linear increase: 2s, 4s, 6s, 8s, max 10s
BackoffStrategy::Linear {
base: Duration::from_secs(2),
max: Duration::from_secs(10)
}
// Exponential: 2s, 4s, 8s, 16s, max 30s
BackoffStrategy::Exponential {
base: Duration::from_secs(2),
multiplier: 2.0,
max: Duration::from_secs(30)
}
Sub-Orchestration (Reusable Workflow)
let result = ctx
.schedule_sub_orchestration_typed::<Input, Output>(
orchestrations::CHILD_WORKFLOW,
&input,
)
.into_sub_orchestration_typed::<Output>()
.await?;
Detached Orchestration (Background/Fire-and-Forget)
// Start orchestration without waiting for completion
let input_json = serde_json::to_string(&input)?;
ctx.schedule_orchestration(
orchestrations::BACKGROUND_WORKFLOW,
&orchestration_id, // Unique ID for this instance
input_json,
);
// Continues immediately - orchestration runs independently
Deterministic Timing
NEVER use tokio::time::sleep() - it breaks determinism!
// Deterministic timer - safe for replay
ctx.schedule_timer(Duration::from_secs(30)).into_timer().await;
// Get current time deterministically
let now = ctx.utcnow().await?;
Signal Handling with select2
Wait for either a timer or an external signal:
// Wait for 30 seconds OR deletion signal (whichever comes first)
let timer = ctx.schedule_timer(Duration::from_secs(30));
let deletion_signal = ctx.schedule_wait("InstanceDeleted");
let (winner_index, _) = ctx.select2(timer, deletion_signal).await;
if winner_index == 1 {
// Signal received
ctx.trace_info("Received signal, exiting gracefully");
return Ok(());
}
// Timer fired, continue
Continue-as-New Pattern
For long-running orchestrations, prevent unbounded history growth:
pub async fn long_running_orchestration(
ctx: OrchestrationContext,
input: MyInput,
) -> Result<MyOutput, String> {
// Do one iteration of work
let result = do_work(&ctx, &input).await?;
// Wait before next iteration
ctx.schedule_timer(Duration::from_secs(60)).into_timer().await;
// Continue as new: restarts with fresh execution history
let next_input = MyInput {
iteration: input.iteration + 1,
..input
};
let input_json = serde_json::to_string(&next_input)?;
ctx.continue_as_new(input_json).await?;
Ok(result)
}
Error Handling Patterns
Propagate Critical Errors
// Fail orchestration if activity fails
let result = ctx
.schedule_activity_typed::<I, O>(NAME, &input)
.into_activity_typed::<O>()
.await?; // ? propagates error
Best-Effort Operations (Log and Continue)
// Don't fail orchestration for non-critical operations
if let Err(err) = ctx
.schedule_activity_typed::<I, O>(NAME, &input)
.into_activity_typed::<O>()
.await
{
ctx.trace_warn(format!("Non-critical operation failed: {}", err));
// Continue despite error
}
Versioning Strategy
CRITICAL: Never modify existing orchestration code. Always create new versions.
Running orchestrations replay their history - changing code breaks replay.
Adding a New Version (Recommended Pattern)
When adding a new version, follow this workflow for cleaner git diffs:
- Copy the current latest version to a new function with the OLD version number
- Make your changes in the existing function name and bump its version
- Register both in the registry
Why? This preserves history per version and makes git diffs show only the actual changes, rather than showing the new version as entirely new code.
// STEP 1: Copy current implementation to preserve v1.0.1
pub async fn my_orchestration_1_0_1(ctx: OrchestrationContext, input: Input) -> Result<Output, String> {
ctx.trace_info("[v1.0.1] Original logic");
// Exact copy of previous implementation - DO NOT MODIFY
}
// STEP 2: Update the main function with new version
// This is now v1.0.2 - git diff will clearly show what changed
pub async fn my_orchestration_1_0_2(ctx: OrchestrationContext, input: Input) -> Result<Output, String> {
ctx.trace_info("[v1.0.2] Updated logic");
// Your new changes here - git diff shows only the delta
}
Register all versions:
OrchestrationRegistry::builder()
.register_typed(NAME, my_orchestration) // v1.0.0 (original)
.register_versioned_typed(NAME, "1.0.1", my_orchestration_1_0_1)
.register_versioned_typed(NAME, "1.0.2", my_orchestration_1_0_2) // Latest
.build()
Logging
Use context logging methods for durability:
ctx.trace_info(format!("Processing: {}", id));
ctx.trace_warn(format!("Warning: {}", message));
ctx.trace_error(format!("Error: {}", error));
Type Definitions
All I/O types must implement Serialize/Deserialize:
// activity_types.rs
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MyInput {
pub name: String,
pub count: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MyOutput {
pub result: String,
pub success: bool,
}
Add serialization tests:
#[test]
fn test_my_input_serialization() {
let input = MyInput { name: "test".into(), count: 5 };
let json = serde_json::to_string(&input).unwrap();
let parsed: MyInput = serde_json::from_str(&json).unwrap();
assert_eq!(input, parsed);
}
Complete Example: Multi-Step Orchestration
pub async fn create_resource_orchestration(
ctx: OrchestrationContext,
input: CreateResourceInput,
) -> Result<CreateResourceOutput, String> {
ctx.trace_info(format!("Creating resource: {}", input.name));
// Step 1: Reserve in database
ctx.schedule_activity_typed::<ReserveInput, ReserveOutput>(
activities::reserve_resource::NAME,
&ReserveInput { name: input.name.clone() },
)
.into_activity_typed::<ReserveOutput>()
.await?;
// Step 2: Create with retry (external service may be flaky)
match create_with_retry(&ctx, &input).await {
Ok(output) => {
// Step 3: Update status to ready
update_status(&ctx, &input.name, "ready").await;
Ok(output)
}
Err(e) => {
ctx.trace_error(format!("Creation failed: {}", e));
// Step 4: Cleanup on failure via sub-orchestration
cleanup_on_failure(&ctx, &input.name).await;
Err(e)
}
}
}
async fn create_with_retry(
ctx: &OrchestrationContext,
input: &CreateResourceInput,
) -> Result<CreateResourceOutput, String> {
ctx
.schedule_activity_with_retry_typed::<CreateInput, CreateOutput>(
activities::create_resource::NAME,
&CreateInput { ... },
RetryPolicy::new(3)
.with_backoff(BackoffStrategy::Exponential {
base: Duration::from_secs(2),
multiplier: 2.0,
max: Duration::from_secs(30),
}),
)
.await
}
async fn cleanup_on_failure(ctx: &OrchestrationContext, name: &str) -> Result<(), String> {
ctx
.schedule_sub_orchestration_typed::<DeleteInput, DeleteOutput>(
orchestrations::DELETE_RESOURCE,
&DeleteInput { name: name.to_string() },
)
.into_sub_orchestration_typed::<DeleteOutput>()
.await?;
Ok(())
}
async fn update_status(ctx: &OrchestrationContext, name: &str, status: &str) {
if let Err(e) = ctx
.schedule_activity_typed::<UpdateInput, UpdateOutput>(
activities::update_status::NAME,
&UpdateInput { name: name.to_string(), status: status.to_string() },
)
.into_activity_typed::<UpdateOutput>()
.await
{
ctx.trace_warn(format!("Failed to update status: {}", e));
}
}
Best Practices Summary
- Naming: Use
{crate}::{type}::{name}format innames.rs - Idempotency: Activities must be safe to retry
- Determinism: Only use
ctx.schedule_timer(), nevertokio::time::sleep() - Versioning: Never modify existing orchestrations - create new versions
- Error Handling: Propagate critical errors, log non-critical ones
- Long-Running: Use continue-as-new to prevent history bloat
- Testing: Add serialization round-trip tests for all types
- Logging: Use
ctx.trace_*()methods, notprintln! - Composition: Use sub-orchestrations for reusable workflows
- Background Tasks: Use detached orchestrations for fire-and-forget
Client API
// Start orchestration (uses latest registered version - PREFERRED)
client.start_orchestration(instance_id, orchestration_name, input).await?;
// Start specific version (only when you need to pin to older version)
client.start_orchestration_versioned(instance_id, orchestration_name, "1.0.1", input).await?;
// Cancel orchestration
client.cancel_instance(instance_id, "reason").await?;
// Send signal
client.send_signal(instance_id, "SignalName", payload).await?;
// Get status
let info = client.get_instance_info(instance_id).await?;
Version Selection
Default behavior: start_orchestration automatically uses the latest registered version.
Use start_orchestration_versioned only when:
- You need to pin to a specific older version for compatibility
- Testing a specific version in isolation
Version numbering convention:
1.0.0- Initial version1.0.1- Bug fixes, minor improvements1.0.2- Additional bug fixes, new optional features- Major version changes for breaking input/output changes