Patrones de Comunicación de Microservicios

La comunicación entre microservicios es uno de los aspectos más críticos de una arquitectura distribuida, afectando directamente la resiliencia, el rendimiento y la mantenibilidad del sistema. Los patrones de comunicación se dividen principalmente en dos categorías: síncrona (REST, gRPC) donde el llamante espera la respuesta, y asíncrona (mensajería, eventos) donde el llamante no bloquea. Esta guía cubre los patrones principales de comunicación entre microservicios, su implementación práctica en Linux y cómo combinarlos en una arquitectura coherente.

Requisitos Previos

  • Linux con Docker y Docker Compose
  • Node.js 18+ o Go 1.20+
  • RabbitMQ o Apache Kafka instalado (para mensajería asíncrona)
  • Nginx instalado (para API Gateway básico)
# Instalar Docker y Docker Compose
apt update && apt install -y docker.io docker-compose-plugin
systemctl enable --now docker

# Instalar RabbitMQ para mensajería
docker run -d \
    --name rabbitmq \
    -p 5672:5672 -p 15672:15672 \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=clave-segura \
    rabbitmq:3-management

Comunicación Síncrona con REST y gRPC

REST con Patrones de Resiliencia

// cliente-http.js - Cliente HTTP con reintentos y circuit breaker
const axios = require('axios');

class ClienteServicio {
    constructor(baseURL, options = {}) {
        this.cliente = axios.create({
            baseURL,
            timeout: options.timeout || 5000,
            headers: { 'Content-Type': 'application/json' }
        });

        // Estado del circuit breaker
        this.circuitState = 'CLOSED';  // CLOSED, OPEN, HALF_OPEN
        this.failureCount = 0;
        this.failureThreshold = options.failureThreshold || 5;
        this.resetTimeout = options.resetTimeout || 30000;  // 30s
    }

    async llamar(metodo, ruta, datos = null, reintentos = 3) {
        // Circuit breaker: rechazar si está abierto
        if (this.circuitState === 'OPEN') {
            throw new Error('Circuito abierto: servicio no disponible');
        }

        for (let intento = 1; intento <= reintentos; intento++) {
            try {
                const response = await this.cliente.request({
                    method: metodo,
                    url: ruta,
                    data: datos
                });

                // Éxito: resetear contador de fallos
                this.falloResuelto();
                return response.data;

            } catch (error) {
                const esUltimoIntento = intento === reintentos;
                const esReintentable = [429, 502, 503, 504].includes(error.response?.status);

                if (esUltimoIntento || !esReintentable) {
                    this.registrarFallo();
                    throw error;
                }

                // Espera exponencial entre reintentos
                const espera = Math.min(1000 * Math.pow(2, intento - 1), 10000);
                console.log(`Reintento ${intento}/${reintentos} en ${espera}ms...`);
                await new Promise(r => setTimeout(r, espera));
            }
        }
    }

    registrarFallo() {
        this.failureCount++;
        if (this.failureCount >= this.failureThreshold) {
            this.circuitState = 'OPEN';
            console.warn('Circuit breaker ABIERTO');
            // Intentar restablecer después de resetTimeout
            setTimeout(() => {
                this.circuitState = 'HALF_OPEN';
                console.log('Circuit breaker en HALF_OPEN, probando reconexión...');
            }, this.resetTimeout);
        }
    }

    falloResuelto() {
        if (this.circuitState === 'HALF_OPEN') {
            this.circuitState = 'CLOSED';
            this.failureCount = 0;
            console.log('Circuit breaker CERRADO, servicio restaurado');
        }
        this.failureCount = Math.max(0, this.failureCount - 1);
    }
}

module.exports = { ClienteServicio };

Comunicación Asíncrona con Mensajería

Publicador/Subscriptor con RabbitMQ

// mensajeria.js - Patrón publish/subscribe con RabbitMQ
const amqp = require('amqplib');

class ServicioMensajeria {
    constructor(url = 'amqp://admin:clave@localhost') {
        this.url = url;
        this.connection = null;
        this.channel = null;
    }

    async conectar() {
        this.connection = await amqp.connect(this.url);
        this.channel = await this.connection.createChannel();

        // Reconfigurar en caso de reconexión
        this.connection.on('error', async (err) => {
            console.error('Error de conexión RabbitMQ:', err);
            await this.reconectar();
        });
    }

    async publicar(exchange, tipoEvento, datos) {
        // Asegurar que el exchange existe
        await this.channel.assertExchange(exchange, 'topic', { durable: true });

        const mensaje = {
            eventType: tipoEvento,
            timestamp: new Date().toISOString(),
            data: datos,
            correlationId: require('crypto').randomUUID()
        };

        const publicado = this.channel.publish(
            exchange,
            tipoEvento,  // Routing key
            Buffer.from(JSON.stringify(mensaje)),
            {
                persistent: true,       // Persistir en disco
                contentType: 'application/json'
            }
        );

        if (publicado) {
            console.log(`Evento publicado: ${tipoEvento}`);
        }
        return mensaje.correlationId;
    }

    async suscribir(exchange, patron, manejador) {
        await this.channel.assertExchange(exchange, 'topic', { durable: true });

        // Cola durable para no perder mensajes si el servicio se reinicia
        const { queue } = await this.channel.assertQueue('', {
            durable: true,
            deadLetterExchange: `${exchange}.dlx`  // Cola de mensajes fallidos
        });

        // Vincular la cola al exchange con el patrón de routing key
        await this.channel.bindQueue(queue, exchange, patron);

        // Prefetch: procesar de 1 en 1 para evitar sobrecarga
        await this.channel.prefetch(10);

        this.channel.consume(queue, async (msg) => {
            if (!msg) return;

            try {
                const mensaje = JSON.parse(msg.content.toString());
                await manejador(mensaje);
                this.channel.ack(msg);  // Confirmar procesamiento
            } catch (error) {
                console.error('Error procesando mensaje:', error);
                // Rechazar y reencolar una vez; después va a la DLQ
                const reencolar = msg.fields.redelivered === false;
                this.channel.nack(msg, false, reencolar);
            }
        });

        console.log(`Suscrito a ${exchange} con patrón: ${patron}`);
    }
}

// Uso en servicio de pedidos
const mensajeria = new ServicioMensajeria();
await mensajeria.conectar();

// Publicar evento cuando se crea un pedido
await mensajeria.publicar('pedidos', 'pedido.creado', {
    pedidoId: '12345',
    usuarioId: 'user-1',
    total: 150.00,
    items: [{ id: 'prod-1', cantidad: 2 }]
});

// Suscribirse a eventos de pedidos en el servicio de inventario
await mensajeria.suscribir('pedidos', 'pedido.*', async (mensaje) => {
    if (mensaje.eventType === 'pedido.creado') {
        console.log('Reservando inventario para pedido:', mensaje.data.pedidoId);
        // Actualizar stock
    }
});

Patrón Saga para Transacciones Distribuidas

El patrón Saga coordina transacciones que abarcan múltiples microservicios:

// saga-orquestador.js - Saga para proceso de pago de pedido
class SagaPago {
    constructor(mensajeria, estado) {
        this.mensajeria = mensajeria;
        this.estado = estado;  // Store persistente (Redis, DB)
    }

    async iniciar(datosPedido) {
        const sagaId = require('crypto').randomUUID();
        
        // Guardar estado inicial de la saga
        await this.estado.set(`saga:${sagaId}`, {
            sagaId,
            pedidoId: datosPedido.pedidoId,
            pasoActual: 'INICIO',
            historial: [],
            datos: datosPedido
        });

        try {
            // Paso 1: Reservar inventario
            await this.reservarInventario(sagaId, datosPedido);
            
            // Paso 2: Procesar pago
            await this.procesarPago(sagaId, datosPedido);
            
            // Paso 3: Confirmar pedido
            await this.confirmarPedido(sagaId, datosPedido);
            
            // Saga completada exitosamente
            await this.actualizarEstado(sagaId, 'COMPLETADA');
            console.log(`Saga ${sagaId} completada exitosamente`);

        } catch (error) {
            console.error(`Saga ${sagaId} falló en paso:`, error.message);
            // Ejecutar compensaciones en orden inverso
            await this.compensar(sagaId);
        }
    }

    async reservarInventario(sagaId, datos) {
        await this.mensajeria.publicar('inventario', 'inventario.reservar', {
            sagaId,
            items: datos.items
        });
        await this.actualizarEstado(sagaId, 'INVENTARIO_RESERVADO');
    }

    async procesarPago(sagaId, datos) {
        await this.mensajeria.publicar('pagos', 'pago.procesar', {
            sagaId,
            total: datos.total,
            metodoPago: datos.metodoPago
        });
        await this.actualizarEstado(sagaId, 'PAGO_PROCESADO');
    }

    async compensar(sagaId) {
        const estado = await this.estado.get(`saga:${sagaId}`);
        
        // Compensaciones en orden inverso según el historial
        const pasos = estado.historial.reverse();
        
        for (const paso of pasos) {
            try {
                switch (paso) {
                    case 'PAGO_PROCESADO':
                        await this.mensajeria.publicar('pagos', 'pago.reembolsar', {
                            sagaId,
                            razon: 'saga_fallida'
                        });
                        break;
                    case 'INVENTARIO_RESERVADO':
                        await this.mensajeria.publicar('inventario', 'inventario.liberar', {
                            sagaId
                        });
                        break;
                }
                console.log(`Compensación ejecutada: ${paso}`);
            } catch (errComp) {
                console.error(`Error en compensación ${paso}:`, errComp);
                // Registrar para revisión manual
            }
        }
        
        await this.actualizarEstado(sagaId, 'COMPENSADA');
    }
}

Event Sourcing

// event-store.js - Store de eventos simple con PostgreSQL
class EventStore {
    constructor(db) {
        this.db = db;
    }

    async inicializar() {
        // Crear tabla de eventos si no existe
        await this.db.query(`
            CREATE TABLE IF NOT EXISTS eventos (
                id BIGSERIAL PRIMARY KEY,
                stream_id VARCHAR(255) NOT NULL,
                tipo_evento VARCHAR(255) NOT NULL,
                version INTEGER NOT NULL,
                datos JSONB NOT NULL,
                metadata JSONB,
                created_at TIMESTAMPTZ DEFAULT NOW(),
                UNIQUE (stream_id, version)
            );
            CREATE INDEX IF NOT EXISTS idx_eventos_stream ON eventos (stream_id, version);
        `);
    }

    async agregarEvento(streamId, tipoEvento, datos, versionEsperada) {
        try {
            const resultado = await this.db.query(
                `INSERT INTO eventos (stream_id, tipo_evento, version, datos)
                 VALUES ($1, $2, (
                     SELECT COALESCE(MAX(version), 0) + 1
                     FROM eventos WHERE stream_id = $1
                 ), $3)
                 RETURNING *`,
                [streamId, tipoEvento, datos]
            );
            return resultado.rows[0];
        } catch (error) {
            if (error.code === '23505') {  // Violación de unique constraint
                throw new Error('Conflicto de versión: concurrencia optimista');
            }
            throw error;
        }
    }

    async obtenerEventos(streamId, desdeVersion = 0) {
        const resultado = await this.db.query(
            `SELECT * FROM eventos 
             WHERE stream_id = $1 AND version > $2
             ORDER BY version ASC`,
            [streamId, desdeVersion]
        );
        return resultado.rows;
    }

    async reconstruirAgregado(streamId, clase) {
        const eventos = await this.obtenerEventos(streamId);
        const agregado = new clase(streamId);

        for (const evento of eventos) {
            agregado.aplicarEvento(evento.tipo_evento, evento.datos);
        }

        return agregado;
    }
}

API Gateway

# Configuración de Nginx como API Gateway
cat > /etc/nginx/sites-available/api-gateway << 'EOF'
# Rate limiting global
limit_req_zone $binary_remote_addr zone=gateway:10m rate=100r/s;

# Upstreams para microservicios internos
upstream usuarios_svc {
    server 127.0.0.1:3001;
    server 127.0.0.1:3002;  # Múltiples instancias
}

upstream pedidos_svc {
    server 127.0.0.1:3003;
}

upstream inventario_svc {
    server 127.0.0.1:3004;
}

upstream pagos_svc {
    server 127.0.0.1:3005;
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name api.midominio.com;

    ssl_certificate /etc/letsencrypt/live/api.midominio.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/api.midominio.com/privkey.pem;

    # Aplicar rate limiting global
    limit_req zone=gateway burst=200 nodelay;

    # Routing a microservicios por prefijo de URL
    location /api/v1/usuarios/ {
        proxy_pass http://usuarios_svc;
        proxy_set_header X-Service "usuarios";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Authorization $http_authorization;
    }

    location /api/v1/pedidos/ {
        proxy_pass http://pedidos_svc;
        proxy_set_header X-Service "pedidos";
        proxy_set_header X-Real-IP $remote_addr;
    }

    location /api/v1/inventario/ {
        proxy_pass http://inventario_svc;
    }

    location /api/v1/pagos/ {
        # Servicio de pagos: mayor seguridad
        limit_req zone=gateway burst=20 nodelay;
        proxy_pass http://pagos_svc;
        proxy_ssl_verify on;
    }

    # Endpoint de health del gateway
    location /health {
        default_type application/json;
        return 200 '{"status":"ok","service":"api-gateway"}';
    }
}
EOF

nginx -t && systemctl reload nginx

Service Mesh y Descubrimiento de Servicios

# Registro de servicios con Consul
docker run -d \
    --name consul \
    -p 8500:8500 \
    -p 8600:8600/udp \
    consul agent -dev -ui -client=0.0.0.0

# Registrar un microservicio en Consul
curl -X PUT http://localhost:8500/v1/agent/service/register \
    -d '{
        "ID": "usuarios-1",
        "Name": "usuarios",
        "Tags": ["v1"],
        "Address": "127.0.0.1",
        "Port": 3001,
        "Check": {
            "HTTP": "http://127.0.0.1:3001/health",
            "Interval": "10s",
            "Timeout": "3s"
        }
    }'

# Descubrir instancias de un servicio
curl "http://localhost:8500/v1/catalog/service/usuarios" | \
    python3 -m json.tool

Resiliencia: Circuit Breaker y Retry

# Docker Compose para stack de microservicios con resiliencia
cat > /opt/microservicios/docker-compose.yml << 'EOF'
version: '3.8'

services:
  api-gateway:
    image: nginx:alpine
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    ports:
      - "80:80"
    depends_on:
      - usuarios-svc
      - pedidos-svc

  usuarios-svc:
    image: mi-empresa/usuarios:latest
    environment:
      - PORT=3000
      - DB_URL=postgres://user:pass@postgres:5432/usuarios
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 3s
      retries: 3

  pedidos-svc:
    image: mi-empresa/pedidos:latest
    environment:
      - PORT=3000
      - RABBITMQ_URL=amqp://admin:clave@rabbitmq
    restart: unless-stopped
    depends_on:
      rabbitmq:
        condition: service_healthy

  rabbitmq:
    image: rabbitmq:3-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: clave-segura
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  rabbitmq_data:
EOF

docker compose -f /opt/microservicios/docker-compose.yml up -d

Solución de Problemas

Mensajes perdidos en RabbitMQ:

# Verificar que las colas son durables
rabbitmqctl list_queues name durable messages
# Verificar los mensajes en la Dead Letter Queue
rabbitmqctl list_queues name messages | grep dlx
# Ver mensajes sin confirmar (unacked)
rabbitmqctl list_queues name messages_unacknowledged

Saga quedó en estado inconsistente:

# Ver el estado actual de la saga en Redis/DB
redis-cli get "saga:SAGA_ID"
# Ejecutar compensaciones manualmente si es necesario
# Revisar logs para identificar qué paso falló
journalctl -u pedidos-svc.service | grep "saga"

Alta latencia en comunicación entre servicios:

# Medir latencia de red entre servicios
ping -c 10 direccion-servicio
# Verificar que los servicios están en la misma red Docker
docker network inspect microservicios_default
# Usar gRPC en lugar de REST para comunicación interna (más eficiente)

Conclusión

No existe un único patrón de comunicación correcto para todos los casos: la comunicación síncrona (REST/gRPC) es apropiada cuando necesitas una respuesta inmediata, mientras que la mensajería asíncrona es superior para operaciones que pueden tolerar latencia y requieren alta resiliencia. Las sagas son la solución para transacciones distribuidas donde no es posible usar una transacción de base de datos única. Combinar estos patrones con circuit breakers, reintentos con backoff exponencial y un API Gateway centralizado es la base de una arquitectura de microservicios robusta y operacionalmente manejable.