Instalación de Temporal: Motor de Workflows Duraderos

Temporal es una plataforma de orquestación de workflows duraderos que garantiza la ejecución completa de flujos de trabajo complejos incluso ante fallos de red, reinicios del servidor o errores transitorios, preservando el estado completo del workflow en cada paso. A diferencia de las colas de mensajes tradicionales, Temporal mantiene el estado de ejecución de forma duradera y permite reintentos automáticos, timeouts, señales y queries en tiempo real sobre workflows en ejecución. Esta guía cubre la instalación completa de Temporal self-hosted en Linux.

Requisitos Previos

  • Linux con Docker y Docker Compose
  • 2 GB RAM mínimo (4 GB recomendado para producción)
  • PostgreSQL o Cassandra (para datos de workflow en producción)
  • Go 1.21+, Python 3.8+, Java 8+, Node.js 18+ o .NET 6+ (para workers)

Instalación con Docker Compose

# Clonar el repositorio de configuración de Temporal
git clone https://github.com/temporalio/docker-compose.git temporal
cd temporal

# Iniciar Temporal con PostgreSQL (recomendado para producción)
docker compose -f docker-compose-postgres.yml up -d

# O con la configuración por defecto (Cassandra, buena para alta escala)
docker compose up -d

# Ver el estado de los servicios
docker compose ps

# Los servicios que se inician:
# - temporal: servidor principal
# - temporal-ui: interfaz web
# - postgresql / cassandra: base de datos
# - elasticsearch: para búsqueda avanzada de workflows

Instalación manual del servidor Temporal

Para producción con configuración personalizada:

# Descargar el servidor Temporal
TEMPORAL_VERSION="1.23.0"
curl -LO "https://github.com/temporalio/temporal/releases/download/v${TEMPORAL_VERSION}/temporal-server_${TEMPORAL_VERSION}_linux_amd64.tar.gz"
tar xzf temporal-server_${TEMPORAL_VERSION}_linux_amd64.tar.gz
sudo install -m 0755 temporal-server /usr/local/bin/

# Descargar la CLI de Temporal
curl -LO "https://github.com/temporalio/cli/releases/latest/download/temporal_linux_amd64.tar.gz"
tar xzf temporal_linux_amd64.tar.gz
sudo install -m 0755 temporal /usr/local/bin/

# Verificar instalaciones
temporal --version
temporal-server --version

Docker Compose para producción

# /opt/temporal/docker-compose.yml
version: '3.8'

services:
  postgresql:
    image: postgres:15-alpine
    environment:
      POSTGRES_PASSWORD: temporal
      POSTGRES_USER: temporal
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  elasticsearch:
    image: elasticsearch:7.17.5
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    volumes:
      - es_data:/usr/share/elasticsearch/data
    restart: unless-stopped

  temporal:
    image: temporalio/auto-setup:1.23.0
    ports:
      - "7233:7233"  # Puerto principal del servidor
    environment:
      DB: postgresql
      DB_PORT: 5432
      POSTGRES_USER: temporal
      POSTGRES_PWD: temporal
      POSTGRES_SEEDS: postgresql
      ENABLE_ES: "true"
      ES_SEEDS: elasticsearch
      ES_VERSION: v7
      TEMPORAL_TLS_REQUIRE_CLIENT_AUTH: "false"
    depends_on:
      - postgresql
      - elasticsearch
    restart: unless-stopped

  temporal-ui:
    image: temporalio/ui:latest
    ports:
      - "127.0.0.1:8080:8080"
    environment:
      TEMPORAL_ADDRESS: temporal:7233
      TEMPORAL_CORS_ORIGINS: "https://temporal.tudominio.com"
    depends_on:
      - temporal
    restart: unless-stopped

volumes:
  postgres_data:
  es_data:
cd /opt/temporal
docker compose up -d

# Verificar que el servidor está listo
temporal operator cluster health

# Ver namespaces disponibles
temporal operator namespace list

Conceptos Fundamentales

Temporal tiene una arquitectura de conceptos bien definidos:

  • Workflow: función duradera que puede ejecutarse durante años
  • Activity: unidad de trabajo que puede fallar y reintentar
  • Worker: proceso que ejecuta Workflows y Activities
  • Namespace: espacio de aislamiento (como una base de datos)
  • Task Queue: cola de tareas pendientes para workers

Gestión de Namespaces

# Crear un namespace para un proyecto
temporal operator namespace create \
    --retention 30d \
    --description "Namespace para el sistema de pedidos" \
    sistema-pedidos

# Ver información del namespace
temporal operator namespace describe sistema-pedidos

# Listar todos los namespaces
temporal operator namespace list

# Actualizar la retención de workflows
temporal operator namespace update \
    --retention 90d \
    sistema-pedidos

Configuración de Workers

Los workers son procesos que consumen tareas de las colas y ejecutan el código de workflows y activities:

Worker en Go

// worker/main.go
package main

import (
    "log"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
    "mi-proyecto/workflows"
    "mi-proyecto/activities"
)

func main() {
    // Conectar al servidor Temporal
    c, err := client.Dial(client.Options{
        HostPort:  "localhost:7233",
        Namespace: "sistema-pedidos",
    })
    if err != nil {
        log.Fatalf("Error conectando a Temporal: %v", err)
    }
    defer c.Close()

    // Crear un worker que escucha la cola "pedidos-queue"
    w := worker.New(c, "pedidos-queue", worker.Options{
        MaxConcurrentActivityExecutionSize: 10,
        MaxConcurrentWorkflowTaskExecutionSize: 5,
    })

    // Registrar workflows y activities
    w.RegisterWorkflow(workflows.ProcesarPedidoWorkflow)
    w.RegisterActivity(activities.ValidarPago)
    w.RegisterActivity(activities.ActualizarInventario)
    w.RegisterActivity(activities.EnviarConfirmacion)

    // Iniciar el worker (bloqueante)
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalf("Error iniciando worker: %v", err)
    }
}

Worker en Python

# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ProcesarPedidoWorkflow
from activities import validar_pago, actualizar_inventario, enviar_confirmacion

async def main():
    # Conectar al servidor Temporal
    client = await Client.connect(
        "localhost:7233",
        namespace="sistema-pedidos"
    )
    
    # Crear y ejecutar el worker
    async with Worker(
        client,
        task_queue="pedidos-queue",
        workflows=[ProcesarPedidoWorkflow],
        activities=[validar_pago, actualizar_inventario, enviar_confirmacion],
        max_concurrent_activities=10,
    ):
        print("Worker iniciado, escuchando en 'pedidos-queue'...")
        await asyncio.Event().wait()  # Mantener el worker activo

if __name__ == "__main__":
    asyncio.run(main())

Diseño de Activities

Las Activities son las unidades de trabajo que interactúan con sistemas externos:

# activities.py
from temporalio import activity
from dataclasses import dataclass
from typing import Optional
import httpx

@dataclass
class PedidoInfo:
    pedido_id: str
    total: float
    email_cliente: str
    items: list

@activity.defn
async def validar_pago(pedido: PedidoInfo) -> dict:
    """Activity que valida el pago con la pasarela de pagos."""
    activity.logger.info(f"Validando pago para pedido {pedido.pedido_id}")
    
    # Latidos periódicos para actividades largas
    # (evita que Temporal piense que la activity murió)
    activity.heartbeat(f"Procesando pago de €{pedido.total}")
    
    async with httpx.AsyncClient() as client:
        respuesta = await client.post(
            "https://api.pasarela-pago.com/cobrar",
            json={
                "monto": pedido.total,
                "pedido_id": pedido.pedido_id
            },
            timeout=30.0
        )
        respuesta.raise_for_status()
    
    return {"pago_id": respuesta.json()["transaction_id"], "estado": "aprobado"}

@activity.defn
async def actualizar_inventario(pedido: PedidoInfo) -> bool:
    """Activity que descuenta el inventario."""
    activity.logger.info(f"Actualizando inventario para {len(pedido.items)} items")
    
    # Lógica de actualización de inventario
    for item in pedido.items:
        activity.heartbeat(f"Procesando item {item['id']}")
        # await db.execute("UPDATE inventario SET stock = stock - $1 WHERE id = $2", ...)
    
    return True

@activity.defn
async def enviar_confirmacion(pedido: PedidoInfo, pago_id: str) -> bool:
    """Activity que envía el email de confirmación."""
    # Enviar email al cliente
    return True

Workflow que orquesta las activities

# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from activities import validar_pago, actualizar_inventario, enviar_confirmacion, PedidoInfo

@workflow.defn
class ProcesarPedidoWorkflow:
    
    @workflow.run
    async def run(self, pedido: PedidoInfo) -> dict:
        workflow.logger.info(f"Procesando pedido {pedido.pedido_id}")
        
        # Política de retry para el pago (intentar 3 veces con backoff)
        retry_pago = RetryPolicy(
            maximum_attempts=3,
            initial_interval=timedelta(seconds=5),
            backoff_coefficient=2.0,
            maximum_interval=timedelta(minutes=1),
            non_retryable_error_types=["PagoRechazadoError"]
        )
        
        # Ejecutar validación de pago con timeout de 2 minutos
        resultado_pago = await workflow.execute_activity(
            validar_pago,
            pedido,
            start_to_close_timeout=timedelta(minutes=2),
            retry_policy=retry_pago
        )
        
        # Actualizar inventario (actividad local, más rápida)
        await workflow.execute_activity(
            actualizar_inventario,
            pedido,
            start_to_close_timeout=timedelta(minutes=1)
        )
        
        # Enviar confirmación (puede fallar sin problema crítico)
        try:
            await workflow.execute_activity(
                enviar_confirmacion,
                pedido, resultado_pago["pago_id"],
                start_to_close_timeout=timedelta(minutes=1),
                retry_policy=RetryPolicy(maximum_attempts=5)
            )
        except Exception as e:
            workflow.logger.warning(f"Error al enviar confirmación: {e}")
        
        return {
            "pedido_id": pedido.pedido_id,
            "estado": "completado",
            "pago_id": resultado_pago["pago_id"]
        }

Políticas de Retry

from temporalio.common import RetryPolicy
from datetime import timedelta

# Retry agresivo para operaciones idempotentes
retry_agresivo = RetryPolicy(
    maximum_attempts=10,
    initial_interval=timedelta(seconds=1),
    backoff_coefficient=1.5,
    maximum_interval=timedelta(minutes=5)
)

# Sin retry para operaciones no idempotentes
sin_retry = RetryPolicy(maximum_attempts=1)

# Retry con errores específicos que NO se deben reintentar
retry_con_excepciones = RetryPolicy(
    maximum_attempts=5,
    initial_interval=timedelta(seconds=10),
    non_retryable_error_types=[
        "DatoInvalidoError",
        "PermisoDenegadoError",
        "RecursoNoEncontradoError"
    ]
)

Temporal Web UI y Visibilidad

# Acceder a la UI en http://localhost:8080 (o via proxy Nginx)

# Buscar workflows activos con la CLI
temporal workflow list --namespace sistema-pedidos

# Ver un workflow específico
temporal workflow describe \
    --workflow-id pedido-12345 \
    --namespace sistema-pedidos

# Ver el historial de eventos de un workflow
temporal workflow show \
    --workflow-id pedido-12345 \
    --namespace sistema-pedidos

# Cancelar un workflow en ejecución
temporal workflow cancel \
    --workflow-id pedido-12345 \
    --namespace sistema-pedidos \
    --reason "Cancelado por el usuario"

# Enviar señal a un workflow en ejecución
temporal workflow signal \
    --workflow-id pedido-12345 \
    --namespace sistema-pedidos \
    --name "aprobar-pedido" \
    --input '{"aprobado": true}'

# Reiniciar un workflow fallido
temporal workflow reset \
    --workflow-id pedido-12345 \
    --namespace sistema-pedidos \
    --event-id 5 \
    --reason "Reintentando después de corregir el bug"

Solución de Problemas

Los workers no procesan tareas

# Verificar que el worker está conectado al namespace correcto
temporal task-queue describe \
    --task-queue pedidos-queue \
    --namespace sistema-pedidos

# Verificar la conectividad al servidor
temporal operator cluster health

# Ver los logs del worker
# En Python: configurar logging
import logging
logging.basicConfig(level=logging.INFO)

Workflows atascados o zombie

# Listar workflows abiertos (posiblemente zombies)
temporal workflow list \
    --namespace sistema-pedidos \
    --status Open \
    --query "ExecutionTime < '2024-01-01T00:00:00Z'"

# Terminar workflows zombie
temporal workflow terminate \
    --workflow-id WORKFLOW_ID \
    --namespace sistema-pedidos \
    --reason "Workflow zombie - limpieza manual"

Error de conexión al servidor

# Verificar que el servidor está corriendo
docker compose ps temporal

# Verificar logs del servidor
docker compose logs temporal --tail=50

# Probar conectividad
temporal operator cluster health --address localhost:7233

Conclusión

Temporal resuelve uno de los problemas más difíciles en sistemas distribuidos: garantizar que los procesos de negocio complejos se completan correctamente incluso cuando los componentes individuales fallan. Su modelo de programación de workflows duraderos elimina la necesidad de implementar manualmente la gestión de estado, reintentos y compensaciones, permitiendo escribir lógica de negocio compleja como código secuencial ordinario. Es especialmente valioso para procesos de negocio críticos como pedidos de ecommerce, aprobaciones financieras y pipelines de datos que deben completarse sin importar los fallos de infraestructura.