Patrones de Comunicación de Microservicios
La comunicación entre microservicios es uno de los aspectos más críticos de una arquitectura distribuida, afectando directamente la resiliencia, el rendimiento y la mantenibilidad del sistema. Los patrones de comunicación se dividen principalmente en dos categorías: síncrona (REST, gRPC) donde el llamante espera la respuesta, y asíncrona (mensajería, eventos) donde el llamante no bloquea. Esta guía cubre los patrones principales de comunicación entre microservicios, su implementación práctica en Linux y cómo combinarlos en una arquitectura coherente.
Requisitos Previos
- Linux con Docker y Docker Compose
- Node.js 18+ o Go 1.20+
- RabbitMQ o Apache Kafka instalado (para mensajería asíncrona)
- Nginx instalado (para API Gateway básico)
# Instalar Docker y Docker Compose
apt update && apt install -y docker.io docker-compose-plugin
systemctl enable --now docker
# Instalar RabbitMQ para mensajería
docker run -d \
--name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=clave-segura \
rabbitmq:3-management
Comunicación Síncrona con REST y gRPC
REST con Patrones de Resiliencia
// cliente-http.js - Cliente HTTP con reintentos y circuit breaker
const axios = require('axios');
class ClienteServicio {
constructor(baseURL, options = {}) {
this.cliente = axios.create({
baseURL,
timeout: options.timeout || 5000,
headers: { 'Content-Type': 'application/json' }
});
// Estado del circuit breaker
this.circuitState = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 30000; // 30s
}
async llamar(metodo, ruta, datos = null, reintentos = 3) {
// Circuit breaker: rechazar si está abierto
if (this.circuitState === 'OPEN') {
throw new Error('Circuito abierto: servicio no disponible');
}
for (let intento = 1; intento <= reintentos; intento++) {
try {
const response = await this.cliente.request({
method: metodo,
url: ruta,
data: datos
});
// Éxito: resetear contador de fallos
this.falloResuelto();
return response.data;
} catch (error) {
const esUltimoIntento = intento === reintentos;
const esReintentable = [429, 502, 503, 504].includes(error.response?.status);
if (esUltimoIntento || !esReintentable) {
this.registrarFallo();
throw error;
}
// Espera exponencial entre reintentos
const espera = Math.min(1000 * Math.pow(2, intento - 1), 10000);
console.log(`Reintento ${intento}/${reintentos} en ${espera}ms...`);
await new Promise(r => setTimeout(r, espera));
}
}
}
registrarFallo() {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.circuitState = 'OPEN';
console.warn('Circuit breaker ABIERTO');
// Intentar restablecer después de resetTimeout
setTimeout(() => {
this.circuitState = 'HALF_OPEN';
console.log('Circuit breaker en HALF_OPEN, probando reconexión...');
}, this.resetTimeout);
}
}
falloResuelto() {
if (this.circuitState === 'HALF_OPEN') {
this.circuitState = 'CLOSED';
this.failureCount = 0;
console.log('Circuit breaker CERRADO, servicio restaurado');
}
this.failureCount = Math.max(0, this.failureCount - 1);
}
}
module.exports = { ClienteServicio };
Comunicación Asíncrona con Mensajería
Publicador/Subscriptor con RabbitMQ
// mensajeria.js - Patrón publish/subscribe con RabbitMQ
const amqp = require('amqplib');
class ServicioMensajeria {
constructor(url = 'amqp://admin:clave@localhost') {
this.url = url;
this.connection = null;
this.channel = null;
}
async conectar() {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();
// Reconfigurar en caso de reconexión
this.connection.on('error', async (err) => {
console.error('Error de conexión RabbitMQ:', err);
await this.reconectar();
});
}
async publicar(exchange, tipoEvento, datos) {
// Asegurar que el exchange existe
await this.channel.assertExchange(exchange, 'topic', { durable: true });
const mensaje = {
eventType: tipoEvento,
timestamp: new Date().toISOString(),
data: datos,
correlationId: require('crypto').randomUUID()
};
const publicado = this.channel.publish(
exchange,
tipoEvento, // Routing key
Buffer.from(JSON.stringify(mensaje)),
{
persistent: true, // Persistir en disco
contentType: 'application/json'
}
);
if (publicado) {
console.log(`Evento publicado: ${tipoEvento}`);
}
return mensaje.correlationId;
}
async suscribir(exchange, patron, manejador) {
await this.channel.assertExchange(exchange, 'topic', { durable: true });
// Cola durable para no perder mensajes si el servicio se reinicia
const { queue } = await this.channel.assertQueue('', {
durable: true,
deadLetterExchange: `${exchange}.dlx` // Cola de mensajes fallidos
});
// Vincular la cola al exchange con el patrón de routing key
await this.channel.bindQueue(queue, exchange, patron);
// Prefetch: procesar de 1 en 1 para evitar sobrecarga
await this.channel.prefetch(10);
this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const mensaje = JSON.parse(msg.content.toString());
await manejador(mensaje);
this.channel.ack(msg); // Confirmar procesamiento
} catch (error) {
console.error('Error procesando mensaje:', error);
// Rechazar y reencolar una vez; después va a la DLQ
const reencolar = msg.fields.redelivered === false;
this.channel.nack(msg, false, reencolar);
}
});
console.log(`Suscrito a ${exchange} con patrón: ${patron}`);
}
}
// Uso en servicio de pedidos
const mensajeria = new ServicioMensajeria();
await mensajeria.conectar();
// Publicar evento cuando se crea un pedido
await mensajeria.publicar('pedidos', 'pedido.creado', {
pedidoId: '12345',
usuarioId: 'user-1',
total: 150.00,
items: [{ id: 'prod-1', cantidad: 2 }]
});
// Suscribirse a eventos de pedidos en el servicio de inventario
await mensajeria.suscribir('pedidos', 'pedido.*', async (mensaje) => {
if (mensaje.eventType === 'pedido.creado') {
console.log('Reservando inventario para pedido:', mensaje.data.pedidoId);
// Actualizar stock
}
});
Patrón Saga para Transacciones Distribuidas
El patrón Saga coordina transacciones que abarcan múltiples microservicios:
// saga-orquestador.js - Saga para proceso de pago de pedido
class SagaPago {
constructor(mensajeria, estado) {
this.mensajeria = mensajeria;
this.estado = estado; // Store persistente (Redis, DB)
}
async iniciar(datosPedido) {
const sagaId = require('crypto').randomUUID();
// Guardar estado inicial de la saga
await this.estado.set(`saga:${sagaId}`, {
sagaId,
pedidoId: datosPedido.pedidoId,
pasoActual: 'INICIO',
historial: [],
datos: datosPedido
});
try {
// Paso 1: Reservar inventario
await this.reservarInventario(sagaId, datosPedido);
// Paso 2: Procesar pago
await this.procesarPago(sagaId, datosPedido);
// Paso 3: Confirmar pedido
await this.confirmarPedido(sagaId, datosPedido);
// Saga completada exitosamente
await this.actualizarEstado(sagaId, 'COMPLETADA');
console.log(`Saga ${sagaId} completada exitosamente`);
} catch (error) {
console.error(`Saga ${sagaId} falló en paso:`, error.message);
// Ejecutar compensaciones en orden inverso
await this.compensar(sagaId);
}
}
async reservarInventario(sagaId, datos) {
await this.mensajeria.publicar('inventario', 'inventario.reservar', {
sagaId,
items: datos.items
});
await this.actualizarEstado(sagaId, 'INVENTARIO_RESERVADO');
}
async procesarPago(sagaId, datos) {
await this.mensajeria.publicar('pagos', 'pago.procesar', {
sagaId,
total: datos.total,
metodoPago: datos.metodoPago
});
await this.actualizarEstado(sagaId, 'PAGO_PROCESADO');
}
async compensar(sagaId) {
const estado = await this.estado.get(`saga:${sagaId}`);
// Compensaciones en orden inverso según el historial
const pasos = estado.historial.reverse();
for (const paso of pasos) {
try {
switch (paso) {
case 'PAGO_PROCESADO':
await this.mensajeria.publicar('pagos', 'pago.reembolsar', {
sagaId,
razon: 'saga_fallida'
});
break;
case 'INVENTARIO_RESERVADO':
await this.mensajeria.publicar('inventario', 'inventario.liberar', {
sagaId
});
break;
}
console.log(`Compensación ejecutada: ${paso}`);
} catch (errComp) {
console.error(`Error en compensación ${paso}:`, errComp);
// Registrar para revisión manual
}
}
await this.actualizarEstado(sagaId, 'COMPENSADA');
}
}
Event Sourcing
// event-store.js - Store de eventos simple con PostgreSQL
class EventStore {
constructor(db) {
this.db = db;
}
async inicializar() {
// Crear tabla de eventos si no existe
await this.db.query(`
CREATE TABLE IF NOT EXISTS eventos (
id BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
tipo_evento VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
datos JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_eventos_stream ON eventos (stream_id, version);
`);
}
async agregarEvento(streamId, tipoEvento, datos, versionEsperada) {
try {
const resultado = await this.db.query(
`INSERT INTO eventos (stream_id, tipo_evento, version, datos)
VALUES ($1, $2, (
SELECT COALESCE(MAX(version), 0) + 1
FROM eventos WHERE stream_id = $1
), $3)
RETURNING *`,
[streamId, tipoEvento, datos]
);
return resultado.rows[0];
} catch (error) {
if (error.code === '23505') { // Violación de unique constraint
throw new Error('Conflicto de versión: concurrencia optimista');
}
throw error;
}
}
async obtenerEventos(streamId, desdeVersion = 0) {
const resultado = await this.db.query(
`SELECT * FROM eventos
WHERE stream_id = $1 AND version > $2
ORDER BY version ASC`,
[streamId, desdeVersion]
);
return resultado.rows;
}
async reconstruirAgregado(streamId, clase) {
const eventos = await this.obtenerEventos(streamId);
const agregado = new clase(streamId);
for (const evento of eventos) {
agregado.aplicarEvento(evento.tipo_evento, evento.datos);
}
return agregado;
}
}
API Gateway
# Configuración de Nginx como API Gateway
cat > /etc/nginx/sites-available/api-gateway << 'EOF'
# Rate limiting global
limit_req_zone $binary_remote_addr zone=gateway:10m rate=100r/s;
# Upstreams para microservicios internos
upstream usuarios_svc {
server 127.0.0.1:3001;
server 127.0.0.1:3002; # Múltiples instancias
}
upstream pedidos_svc {
server 127.0.0.1:3003;
}
upstream inventario_svc {
server 127.0.0.1:3004;
}
upstream pagos_svc {
server 127.0.0.1:3005;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name api.midominio.com;
ssl_certificate /etc/letsencrypt/live/api.midominio.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/api.midominio.com/privkey.pem;
# Aplicar rate limiting global
limit_req zone=gateway burst=200 nodelay;
# Routing a microservicios por prefijo de URL
location /api/v1/usuarios/ {
proxy_pass http://usuarios_svc;
proxy_set_header X-Service "usuarios";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Authorization $http_authorization;
}
location /api/v1/pedidos/ {
proxy_pass http://pedidos_svc;
proxy_set_header X-Service "pedidos";
proxy_set_header X-Real-IP $remote_addr;
}
location /api/v1/inventario/ {
proxy_pass http://inventario_svc;
}
location /api/v1/pagos/ {
# Servicio de pagos: mayor seguridad
limit_req zone=gateway burst=20 nodelay;
proxy_pass http://pagos_svc;
proxy_ssl_verify on;
}
# Endpoint de health del gateway
location /health {
default_type application/json;
return 200 '{"status":"ok","service":"api-gateway"}';
}
}
EOF
nginx -t && systemctl reload nginx
Service Mesh y Descubrimiento de Servicios
# Registro de servicios con Consul
docker run -d \
--name consul \
-p 8500:8500 \
-p 8600:8600/udp \
consul agent -dev -ui -client=0.0.0.0
# Registrar un microservicio en Consul
curl -X PUT http://localhost:8500/v1/agent/service/register \
-d '{
"ID": "usuarios-1",
"Name": "usuarios",
"Tags": ["v1"],
"Address": "127.0.0.1",
"Port": 3001,
"Check": {
"HTTP": "http://127.0.0.1:3001/health",
"Interval": "10s",
"Timeout": "3s"
}
}'
# Descubrir instancias de un servicio
curl "http://localhost:8500/v1/catalog/service/usuarios" | \
python3 -m json.tool
Resiliencia: Circuit Breaker y Retry
# Docker Compose para stack de microservicios con resiliencia
cat > /opt/microservicios/docker-compose.yml << 'EOF'
version: '3.8'
services:
api-gateway:
image: nginx:alpine
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
ports:
- "80:80"
depends_on:
- usuarios-svc
- pedidos-svc
usuarios-svc:
image: mi-empresa/usuarios:latest
environment:
- PORT=3000
- DB_URL=postgres://user:pass@postgres:5432/usuarios
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 10s
timeout: 3s
retries: 3
pedidos-svc:
image: mi-empresa/pedidos:latest
environment:
- PORT=3000
- RABBITMQ_URL=amqp://admin:clave@rabbitmq
restart: unless-stopped
depends_on:
rabbitmq:
condition: service_healthy
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: clave-segura
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
volumes:
rabbitmq_data:
EOF
docker compose -f /opt/microservicios/docker-compose.yml up -d
Solución de Problemas
Mensajes perdidos en RabbitMQ:
# Verificar que las colas son durables
rabbitmqctl list_queues name durable messages
# Verificar los mensajes en la Dead Letter Queue
rabbitmqctl list_queues name messages | grep dlx
# Ver mensajes sin confirmar (unacked)
rabbitmqctl list_queues name messages_unacknowledged
Saga quedó en estado inconsistente:
# Ver el estado actual de la saga en Redis/DB
redis-cli get "saga:SAGA_ID"
# Ejecutar compensaciones manualmente si es necesario
# Revisar logs para identificar qué paso falló
journalctl -u pedidos-svc.service | grep "saga"
Alta latencia en comunicación entre servicios:
# Medir latencia de red entre servicios
ping -c 10 direccion-servicio
# Verificar que los servicios están en la misma red Docker
docker network inspect microservicios_default
# Usar gRPC en lugar de REST para comunicación interna (más eficiente)
Conclusión
No existe un único patrón de comunicación correcto para todos los casos: la comunicación síncrona (REST/gRPC) es apropiada cuando necesitas una respuesta inmediata, mientras que la mensajería asíncrona es superior para operaciones que pueden tolerar latencia y requieren alta resiliencia. Las sagas son la solución para transacciones distribuidas donde no es posible usar una transacción de base de datos única. Combinar estos patrones con circuit breakers, reintentos con backoff exponencial y un API Gateway centralizado es la base de una arquitectura de microservicios robusta y operacionalmente manejable.


