Orchestration

AWS Step Functions

Serverless workflow orchestration for ML pipelines

Step Functions

state machines; retries/timeouts

Mental model

  • Control plane for workflows: you pay for orchestration, not compute.
  • Use it to make pipelines observable + resumable + retryable.

Where it shows up in ML/GenAI

  • RAG ingestion pipeline: fetch → parse → chunk → embed → upsert → validate
  • Agent workflows: plan → tool calls (parallel) → aggregate → human approval → publish
  • “Glue” between services (Lambda/ECS/Batch/SageMaker/EMR/Glue)

Key knobs (senior knobs)

  • Workflow type

    • Standard: durable, long-running, full history (default for pipelines)
    • Express: high-throughput, short-lived, cheaper per run (great for event fanout)
  • Retries/timeouts: set per-state retry policy + overall state timeouts

  • Error handling: Catch → route to compensation / DLQ / manual review

  • Parallel/Map: control fanout; cap concurrency

  • Service integrations: prefer native integrations to reduce glue Lambda

  • Execution history/logging: enable CloudWatch logs + X-Ray when debugging

Pricing mental model

  • Standard: cost ≈ state transitions
  • Express: cost ≈ invocations + duration
  • Senior heuristic: optimize by reducing “tiny states” and unnecessary transitions; push heavy loops to Batch/ECS.

Terraform template (Standard state machine + logs)

resource "aws_cloudwatch_log_group" "sfn" {
  name              = "/aws/vendedlogs/states/${var.name}"
  retention_in_days = 14
}

data "aws_iam_policy_document" "sfn_assume" {
  statement {
    effect = "Allow"
    principals { type = "Service", identifiers = ["states.amazonaws.com"] }
    actions = ["sts:AssumeRole"]
  }
}

resource "aws_iam_role" "sfn_role" {
  name               = "${var.name}-sfn-role"
  assume_role_policy = data.aws_iam_policy_document.sfn_assume.json
}

# Add least-privilege permissions for the tasks you call (Lambda/ECS/Batch/etc)
resource "aws_iam_role_policy" "sfn_policy" {
  role = aws_iam_role.sfn_role.id
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      { Effect="Allow", Action=["lambda:InvokeFunction"], Resource=var.lambda_arns },
      { Effect="Allow", Action=["logs:CreateLogDelivery","logs:GetLogDelivery","logs:UpdateLogDelivery","logs:DeleteLogDelivery","logs:ListLogDeliveries","logs:PutResourcePolicy","logs:DescribeResourcePolicies","logs:DescribeLogGroups"], Resource="*" }
    ]
  })
}

resource "aws_sfn_state_machine" "sm" {
  name     = var.name
  role_arn = aws_iam_role.sfn_role.arn

  definition = jsonencode({
    Comment = "Example: agent/tool workflow"
    StartAt = "ToolCall"
    States = {
      ToolCall = {
        Type = "Task"
        Resource = "arn:aws:states:::lambda:invoke"
        Parameters = {
          FunctionName = var.tool_lambda_arn
          Payload = { "input.$" = "$" }
        }
        Retry = [{
          ErrorEquals = ["Lambda.ServiceException","Lambda.TooManyRequestsException","States.TaskFailed"]
          IntervalSeconds = 2
          MaxAttempts = 4
          BackoffRate = 2.0
        }]
        Catch = [{
          ErrorEquals = ["States.ALL"]
          Next = "FailToDLQ"
        }]
        Next = "Success"
      }
      FailToDLQ = { Type="Fail", Error="ToolFailed" }
      Success   = { Type="Succeed" }
    }
  })

  logging_configuration {
    level                  = "ALL"
    include_execution_data  = true
    log_destination         = "${aws_cloudwatch_log_group.sfn.arn}:*"
  }
}

variable "name"           { type = string }
variable "tool_lambda_arn"{ type = string }
variable "lambda_arns"    { type = list(string) default = [] }