Instalación de Temporal: Motor de Workflows Duraderos
Temporal es una plataforma de orquestación de workflows duraderos que garantiza la ejecución completa de flujos de trabajo complejos incluso ante fallos de red, reinicios del servidor o errores transitorios, preservando el estado completo del workflow en cada paso. A diferencia de las colas de mensajes tradicionales, Temporal mantiene el estado de ejecución de forma duradera y permite reintentos automáticos, timeouts, señales y queries en tiempo real sobre workflows en ejecución. Esta guía cubre la instalación completa de Temporal self-hosted en Linux.
Requisitos Previos
- Linux con Docker y Docker Compose
- 2 GB RAM mínimo (4 GB recomendado para producción)
- PostgreSQL o Cassandra (para datos de workflow en producción)
- Go 1.21+, Python 3.8+, Java 8+, Node.js 18+ o .NET 6+ (para workers)
Instalación con Docker Compose
# Clonar el repositorio de configuración de Temporal
git clone https://github.com/temporalio/docker-compose.git temporal
cd temporal
# Iniciar Temporal con PostgreSQL (recomendado para producción)
docker compose -f docker-compose-postgres.yml up -d
# O con la configuración por defecto (Cassandra, buena para alta escala)
docker compose up -d
# Ver el estado de los servicios
docker compose ps
# Los servicios que se inician:
# - temporal: servidor principal
# - temporal-ui: interfaz web
# - postgresql / cassandra: base de datos
# - elasticsearch: para búsqueda avanzada de workflows
Instalación manual del servidor Temporal
Para producción con configuración personalizada:
# Descargar el servidor Temporal
TEMPORAL_VERSION="1.23.0"
curl -LO "https://github.com/temporalio/temporal/releases/download/v${TEMPORAL_VERSION}/temporal-server_${TEMPORAL_VERSION}_linux_amd64.tar.gz"
tar xzf temporal-server_${TEMPORAL_VERSION}_linux_amd64.tar.gz
sudo install -m 0755 temporal-server /usr/local/bin/
# Descargar la CLI de Temporal
curl -LO "https://github.com/temporalio/cli/releases/latest/download/temporal_linux_amd64.tar.gz"
tar xzf temporal_linux_amd64.tar.gz
sudo install -m 0755 temporal /usr/local/bin/
# Verificar instalaciones
temporal --version
temporal-server --version
Docker Compose para producción
# /opt/temporal/docker-compose.yml
version: '3.8'
services:
postgresql:
image: postgres:15-alpine
environment:
POSTGRES_PASSWORD: temporal
POSTGRES_USER: temporal
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
elasticsearch:
image: elasticsearch:7.17.5
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
volumes:
- es_data:/usr/share/elasticsearch/data
restart: unless-stopped
temporal:
image: temporalio/auto-setup:1.23.0
ports:
- "7233:7233" # Puerto principal del servidor
environment:
DB: postgresql
DB_PORT: 5432
POSTGRES_USER: temporal
POSTGRES_PWD: temporal
POSTGRES_SEEDS: postgresql
ENABLE_ES: "true"
ES_SEEDS: elasticsearch
ES_VERSION: v7
TEMPORAL_TLS_REQUIRE_CLIENT_AUTH: "false"
depends_on:
- postgresql
- elasticsearch
restart: unless-stopped
temporal-ui:
image: temporalio/ui:latest
ports:
- "127.0.0.1:8080:8080"
environment:
TEMPORAL_ADDRESS: temporal:7233
TEMPORAL_CORS_ORIGINS: "https://temporal.tudominio.com"
depends_on:
- temporal
restart: unless-stopped
volumes:
postgres_data:
es_data:
cd /opt/temporal
docker compose up -d
# Verificar que el servidor está listo
temporal operator cluster health
# Ver namespaces disponibles
temporal operator namespace list
Conceptos Fundamentales
Temporal tiene una arquitectura de conceptos bien definidos:
- Workflow: función duradera que puede ejecutarse durante años
- Activity: unidad de trabajo que puede fallar y reintentar
- Worker: proceso que ejecuta Workflows y Activities
- Namespace: espacio de aislamiento (como una base de datos)
- Task Queue: cola de tareas pendientes para workers
Gestión de Namespaces
# Crear un namespace para un proyecto
temporal operator namespace create \
--retention 30d \
--description "Namespace para el sistema de pedidos" \
sistema-pedidos
# Ver información del namespace
temporal operator namespace describe sistema-pedidos
# Listar todos los namespaces
temporal operator namespace list
# Actualizar la retención de workflows
temporal operator namespace update \
--retention 90d \
sistema-pedidos
Configuración de Workers
Los workers son procesos que consumen tareas de las colas y ejecutan el código de workflows y activities:
Worker en Go
// worker/main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"mi-proyecto/workflows"
"mi-proyecto/activities"
)
func main() {
// Conectar al servidor Temporal
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "sistema-pedidos",
})
if err != nil {
log.Fatalf("Error conectando a Temporal: %v", err)
}
defer c.Close()
// Crear un worker que escucha la cola "pedidos-queue"
w := worker.New(c, "pedidos-queue", worker.Options{
MaxConcurrentActivityExecutionSize: 10,
MaxConcurrentWorkflowTaskExecutionSize: 5,
})
// Registrar workflows y activities
w.RegisterWorkflow(workflows.ProcesarPedidoWorkflow)
w.RegisterActivity(activities.ValidarPago)
w.RegisterActivity(activities.ActualizarInventario)
w.RegisterActivity(activities.EnviarConfirmacion)
// Iniciar el worker (bloqueante)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalf("Error iniciando worker: %v", err)
}
}
Worker en Python
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ProcesarPedidoWorkflow
from activities import validar_pago, actualizar_inventario, enviar_confirmacion
async def main():
# Conectar al servidor Temporal
client = await Client.connect(
"localhost:7233",
namespace="sistema-pedidos"
)
# Crear y ejecutar el worker
async with Worker(
client,
task_queue="pedidos-queue",
workflows=[ProcesarPedidoWorkflow],
activities=[validar_pago, actualizar_inventario, enviar_confirmacion],
max_concurrent_activities=10,
):
print("Worker iniciado, escuchando en 'pedidos-queue'...")
await asyncio.Event().wait() # Mantener el worker activo
if __name__ == "__main__":
asyncio.run(main())
Diseño de Activities
Las Activities son las unidades de trabajo que interactúan con sistemas externos:
# activities.py
from temporalio import activity
from dataclasses import dataclass
from typing import Optional
import httpx
@dataclass
class PedidoInfo:
pedido_id: str
total: float
email_cliente: str
items: list
@activity.defn
async def validar_pago(pedido: PedidoInfo) -> dict:
"""Activity que valida el pago con la pasarela de pagos."""
activity.logger.info(f"Validando pago para pedido {pedido.pedido_id}")
# Latidos periódicos para actividades largas
# (evita que Temporal piense que la activity murió)
activity.heartbeat(f"Procesando pago de €{pedido.total}")
async with httpx.AsyncClient() as client:
respuesta = await client.post(
"https://api.pasarela-pago.com/cobrar",
json={
"monto": pedido.total,
"pedido_id": pedido.pedido_id
},
timeout=30.0
)
respuesta.raise_for_status()
return {"pago_id": respuesta.json()["transaction_id"], "estado": "aprobado"}
@activity.defn
async def actualizar_inventario(pedido: PedidoInfo) -> bool:
"""Activity que descuenta el inventario."""
activity.logger.info(f"Actualizando inventario para {len(pedido.items)} items")
# Lógica de actualización de inventario
for item in pedido.items:
activity.heartbeat(f"Procesando item {item['id']}")
# await db.execute("UPDATE inventario SET stock = stock - $1 WHERE id = $2", ...)
return True
@activity.defn
async def enviar_confirmacion(pedido: PedidoInfo, pago_id: str) -> bool:
"""Activity que envía el email de confirmación."""
# Enviar email al cliente
return True
Workflow que orquesta las activities
# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from activities import validar_pago, actualizar_inventario, enviar_confirmacion, PedidoInfo
@workflow.defn
class ProcesarPedidoWorkflow:
@workflow.run
async def run(self, pedido: PedidoInfo) -> dict:
workflow.logger.info(f"Procesando pedido {pedido.pedido_id}")
# Política de retry para el pago (intentar 3 veces con backoff)
retry_pago = RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=5),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=1),
non_retryable_error_types=["PagoRechazadoError"]
)
# Ejecutar validación de pago con timeout de 2 minutos
resultado_pago = await workflow.execute_activity(
validar_pago,
pedido,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=retry_pago
)
# Actualizar inventario (actividad local, más rápida)
await workflow.execute_activity(
actualizar_inventario,
pedido,
start_to_close_timeout=timedelta(minutes=1)
)
# Enviar confirmación (puede fallar sin problema crítico)
try:
await workflow.execute_activity(
enviar_confirmacion,
pedido, resultado_pago["pago_id"],
start_to_close_timeout=timedelta(minutes=1),
retry_policy=RetryPolicy(maximum_attempts=5)
)
except Exception as e:
workflow.logger.warning(f"Error al enviar confirmación: {e}")
return {
"pedido_id": pedido.pedido_id,
"estado": "completado",
"pago_id": resultado_pago["pago_id"]
}
Políticas de Retry
from temporalio.common import RetryPolicy
from datetime import timedelta
# Retry agresivo para operaciones idempotentes
retry_agresivo = RetryPolicy(
maximum_attempts=10,
initial_interval=timedelta(seconds=1),
backoff_coefficient=1.5,
maximum_interval=timedelta(minutes=5)
)
# Sin retry para operaciones no idempotentes
sin_retry = RetryPolicy(maximum_attempts=1)
# Retry con errores específicos que NO se deben reintentar
retry_con_excepciones = RetryPolicy(
maximum_attempts=5,
initial_interval=timedelta(seconds=10),
non_retryable_error_types=[
"DatoInvalidoError",
"PermisoDenegadoError",
"RecursoNoEncontradoError"
]
)
Temporal Web UI y Visibilidad
# Acceder a la UI en http://localhost:8080 (o via proxy Nginx)
# Buscar workflows activos con la CLI
temporal workflow list --namespace sistema-pedidos
# Ver un workflow específico
temporal workflow describe \
--workflow-id pedido-12345 \
--namespace sistema-pedidos
# Ver el historial de eventos de un workflow
temporal workflow show \
--workflow-id pedido-12345 \
--namespace sistema-pedidos
# Cancelar un workflow en ejecución
temporal workflow cancel \
--workflow-id pedido-12345 \
--namespace sistema-pedidos \
--reason "Cancelado por el usuario"
# Enviar señal a un workflow en ejecución
temporal workflow signal \
--workflow-id pedido-12345 \
--namespace sistema-pedidos \
--name "aprobar-pedido" \
--input '{"aprobado": true}'
# Reiniciar un workflow fallido
temporal workflow reset \
--workflow-id pedido-12345 \
--namespace sistema-pedidos \
--event-id 5 \
--reason "Reintentando después de corregir el bug"
Solución de Problemas
Los workers no procesan tareas
# Verificar que el worker está conectado al namespace correcto
temporal task-queue describe \
--task-queue pedidos-queue \
--namespace sistema-pedidos
# Verificar la conectividad al servidor
temporal operator cluster health
# Ver los logs del worker
# En Python: configurar logging
import logging
logging.basicConfig(level=logging.INFO)
Workflows atascados o zombie
# Listar workflows abiertos (posiblemente zombies)
temporal workflow list \
--namespace sistema-pedidos \
--status Open \
--query "ExecutionTime < '2024-01-01T00:00:00Z'"
# Terminar workflows zombie
temporal workflow terminate \
--workflow-id WORKFLOW_ID \
--namespace sistema-pedidos \
--reason "Workflow zombie - limpieza manual"
Error de conexión al servidor
# Verificar que el servidor está corriendo
docker compose ps temporal
# Verificar logs del servidor
docker compose logs temporal --tail=50
# Probar conectividad
temporal operator cluster health --address localhost:7233
Conclusión
Temporal resuelve uno de los problemas más difíciles en sistemas distribuidos: garantizar que los procesos de negocio complejos se completan correctamente incluso cuando los componentes individuales fallan. Su modelo de programación de workflows duraderos elimina la necesidad de implementar manualmente la gestión de estado, reintentos y compensaciones, permitiendo escribir lógica de negocio compleja como código secuencial ordinario. Es especialmente valioso para procesos de negocio críticos como pedidos de ecommerce, aprobaciones financieras y pipelines de datos que deben completarse sin importar los fallos de infraestructura.


