Automatización con Python: Scripts Útiles para DevOps y Administración de Sistemas
Introducción
Python se ha convertido en el lenguaje de facto para automatización DevOps, gestión de infraestructura y tareas de administración de sistemas. Su legibilidad, extenso ecosistema de librerías y compatibilidad multiplataforma lo hacen ideal para construir herramientas de automatización robustas. Mientras que Bash sobresale en scripts simples, Python brilla cuando necesitas lógica compleja, procesamiento de datos, integraciones con APIs o bases de código mantenibles.
Esta guía completa presenta scripts de Python listos para producción para tareas comunes de DevOps, cubriendo gestión de servidores, automatización en la nube, monitoreo y orquestación de infraestructura.
Requisitos Previos
- Python 3.7+ instalado
- Conocimiento básico de programación Python
- Comprensión de sistemas Linux/Unix
- Familiaridad con herramientas de línea de comandos
- pip para gestión de paquetes
Configuración Esencial de Python
Configuración de Entorno Virtual
# Crear entorno virtual
python3 -m venv ~/automation-env
source ~/automation-env/bin/activate
# Instalar librerías comunes
pip install requests paramiko boto3 fabric psutil schedule
Librerías Requeridas
# requirements.txt
requests>=2.31.0
paramiko>=3.3.1
boto3>=1.28.0
fabric>=3.2.0
psutil>=5.9.5
schedule>=1.2.0
python-dotenv>=1.0.0
pyyaml>=6.0.1
jinja2>=3.1.2
Scripts de Gestión de Servidores
1. Monitor de Salud del Sistema
#!/usr/bin/env python3
"""
system_monitor.py - Monitoreo completo de salud del sistema
"""
import psutil
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
class SystemMonitor:
def __init__(self, cpu_threshold=80, memory_threshold=85, disk_threshold=90):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.disk_threshold = disk_threshold
self.alerts = []
def check_cpu(self):
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > self.cpu_threshold:
self.alerts.append(f"CPU usage is {cpu_percent}%")
return cpu_percent
def check_memory(self):
memory = psutil.virtual_memory()
if memory.percent > self.memory_threshold:
self.alerts.append(f"Memory usage is {memory.percent}%")
return memory.percent
def check_disk(self):
disk = psutil.disk_usage('/')
if disk.percent > self.disk_threshold:
self.alerts.append(f"Disk usage is {disk.percent}%")
return disk.percent
def get_top_processes(self, n=5):
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:n]
def generate_report(self):
report = f"""
System Health Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
================================================================
Resource Usage:
- CPU: {self.check_cpu()}%
- Memory: {self.check_memory()}%
- Disk: {self.check_disk()}%
Top Processes by CPU:
"""
for proc in self.get_top_processes():
report += f"- {proc['name']} (PID {proc['pid']}): {proc['cpu_percent']}%\n"
if self.alerts:
report += "\n⚠️ ALERTS:\n"
for alert in self.alerts:
report += f"- {alert}\n"
return report
def send_alert(self, email_to, email_from, smtp_server):
if not self.alerts:
return
msg = MIMEText(self.generate_report())
msg['Subject'] = 'System Health Alert'
msg['From'] = email_from
msg['To'] = email_to
with smtplib.SMTP(smtp_server) as server:
server.send_message(msg)
if __name__ == "__main__":
monitor = SystemMonitor()
print(monitor.generate_report())
if monitor.alerts:
# monitor.send_alert('[email protected]', '[email protected]', 'localhost')
pass
2. Script de Respaldo Automatizado
#!/usr/bin/env python3
"""
backup_manager.py - Solución completa de respaldo
"""
import os
import tarfile
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import subprocess
import boto3
class BackupManager:
def __init__(self, backup_dir="/backup", retention_days=7):
self.backup_dir = Path(backup_dir)
self.retention_days = retention_days
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.backup_dir.mkdir(parents=True, exist_ok=True)
def backup_directory(self, source_dir, name=None):
"""Create compressed backup of directory"""
source = Path(source_dir)
if not source.exists():
raise FileNotFoundError(f"Source directory not found: {source_dir}")
backup_name = name or source.name
archive_path = self.backup_dir / f"{backup_name}_{self.timestamp}.tar.gz"
print(f"Backing up {source_dir} to {archive_path}")
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(source_dir, arcname=backup_name)
return archive_path
def backup_mysql(self, database=None, user="root", password=None):
"""Backup MySQL database"""
backup_file = self.backup_dir / f"mysql_{database or 'all'}_{self.timestamp}.sql.gz"
cmd = ["mysqldump"]
if user:
cmd.extend(["-u", user])
if password:
cmd.extend([f"--password={password}"])
if database:
cmd.append(database)
else:
cmd.append("--all-databases")
print(f"Backing up MySQL database(s) to {backup_file}")
with gzip.open(backup_file, 'wb') as f:
subprocess.run(cmd, stdout=f, check=True)
return backup_file
def backup_postgresql(self, database=None):
"""Backup PostgreSQL database"""
backup_file = self.backup_dir / f"postgres_{database or 'all'}_{self.timestamp}.sql.gz"
cmd = ["sudo", "-u", "postgres", "pg_dump" if database else "pg_dumpall"]
if database:
cmd.append(database)
print(f"Backing up PostgreSQL to {backup_file}")
with gzip.open(backup_file, 'wb') as f:
subprocess.run(cmd, stdout=f, check=True)
return backup_file
def upload_to_s3(self, file_path, bucket, prefix=""):
"""Upload backup to S3"""
s3 = boto3.client('s3')
key = f"{prefix}/{Path(file_path).name}" if prefix else Path(file_path).name
print(f"Uploading {file_path} to s3://{bucket}/{key}")
s3.upload_file(str(file_path), bucket, key)
def cleanup_old_backups(self):
"""Remove backups older than retention period"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
for backup_file in self.backup_dir.glob("*"):
if backup_file.stat().st_mtime < cutoff_date.timestamp():
print(f"Removing old backup: {backup_file}")
backup_file.unlink()
def run_full_backup(self, config):
"""Execute full backup based on configuration"""
backup_files = []
# Backup directories
for directory in config.get('directories', []):
backup_files.append(self.backup_directory(directory))
# Backup MySQL databases
for db_config in config.get('mysql', []):
backup_files.append(self.backup_mysql(**db_config))
# Backup PostgreSQL databases
for db in config.get('postgresql', []):
backup_files.append(self.backup_postgresql(db))
# Upload to S3 if configured
if 's3' in config:
for backup_file in backup_files:
self.upload_to_s3(backup_file, **config['s3'])
# Cleanup old backups
self.cleanup_old_backups()
return backup_files
if __name__ == "__main__":
backup_config = {
'directories': ['/var/www', '/etc'],
'mysql': [
{'database': 'production_db', 'user': 'backup', 'password': 'secret'}
],
'postgresql': ['app_database'],
's3': {
'bucket': 'company-backups',
'prefix': 'daily'
}
}
manager = BackupManager()
manager.run_full_backup(backup_config)
3. Analizador de Logs
#!/usr/bin/env python3
"""
log_analyzer.py - Analizar y reportar sobre archivos de log
"""
import re
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = Path(log_file)
self.entries = []
def parse_nginx_log(self):
"""Parse Nginx access log"""
pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"'
with open(self.log_file) as f:
for line in f:
match = re.match(pattern, line)
if match:
self.entries.append({
'ip': match.group(1),
'timestamp': match.group(2),
'request': match.group(3),
'status': int(match.group(4)),
'bytes': int(match.group(5)),
'referrer': match.group(6),
'user_agent': match.group(7)
})
def get_top_ips(self, n=10):
"""Get top N IP addresses by request count"""
ips = [entry['ip'] for entry in self.entries]
return Counter(ips).most_common(n)
def get_status_distribution(self):
"""Get HTTP status code distribution"""
statuses = [entry['status'] for entry in self.entries]
return dict(Counter(statuses))
def get_error_requests(self):
"""Get all requests with 4xx or 5xx status codes"""
return [entry for entry in self.entries if entry['status'] >= 400]
def get_bandwidth_usage(self):
"""Calculate total bandwidth used"""
total_bytes = sum(entry['bytes'] for entry in self.entries)
return {
'bytes': total_bytes,
'kb': total_bytes / 1024,
'mb': total_bytes / (1024 ** 2),
'gb': total_bytes / (1024 ** 3)
}
def generate_report(self):
"""Generate comprehensive log analysis report"""
report = f"""
Log Analysis Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
File: {self.log_file}
Total Requests: {len(self.entries)}
================================================================
Top 10 IP Addresses:
"""
for ip, count in self.get_top_ips():
report += f" {ip}: {count} requests\n"
report += "\nHTTP Status Distribution:\n"
for status, count in sorted(self.get_status_distribution().items()):
report += f" {status}: {count}\n"
bandwidth = self.get_bandwidth_usage()
report += f"\nBandwidth Usage:\n"
report += f" Total: {bandwidth['gb']:.2f} GB ({bandwidth['mb']:.2f} MB)\n"
errors = self.get_error_requests()
report += f"\nError Requests: {len(errors)}\n"
if errors:
report += "Sample errors:\n"
for error in errors[:5]:
report += f" [{error['status']}] {error['request']}\n"
return report
if __name__ == "__main__":
analyzer = LogAnalyzer('/var/log/nginx/access.log')
analyzer.parse_nginx_log()
print(analyzer.generate_report())
Scripts de Automatización en la Nube
4. Gestor de AWS EC2
#!/usr/bin/env python3
"""
aws_manager.py - Gestionar instancias AWS EC2
"""
import boto3
from botocore.exceptions import ClientError
class AWSManager:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.resource('ec2', region_name=region)
self.client = boto3.client('ec2', region_name=region)
def list_instances(self, filters=None):
"""List all EC2 instances"""
instances = []
for instance in self.ec2.instances.filter(Filters=filters or []):
instances.append({
'id': instance.id,
'type': instance.instance_type,
'state': instance.state['Name'],
'public_ip': instance.public_ip_address,
'private_ip': instance.private_ip_address,
'name': next((tag['Value'] for tag in instance.tags or [] if tag['Key'] == 'Name'), None)
})
return instances
def start_instances(self, instance_ids):
"""Start EC2 instances"""
try:
self.client.start_instances(InstanceIds=instance_ids)
print(f"Started instances: {', '.join(instance_ids)}")
except ClientError as e:
print(f"Error starting instances: {e}")
def stop_instances(self, instance_ids):
"""Stop EC2 instances"""
try:
self.client.stop_instances(InstanceIds=instance_ids)
print(f"Stopped instances: {', '.join(instance_ids)}")
except ClientError as e:
print(f"Error stopping instances: {e}")
def create_snapshot(self, volume_id, description):
"""Create EBS snapshot"""
try:
snapshot = self.client.create_snapshot(
VolumeId=volume_id,
Description=description
)
print(f"Created snapshot: {snapshot['SnapshotId']}")
return snapshot['SnapshotId']
except ClientError as e:
print(f"Error creating snapshot: {e}")
def cleanup_old_snapshots(self, days=30):
"""Delete snapshots older than specified days"""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
snapshots = self.client.describe_snapshots(OwnerIds=['self'])['Snapshots']
for snapshot in snapshots:
start_time = snapshot['StartTime'].replace(tzinfo=None)
if start_time < cutoff_date:
try:
self.client.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
print(f"Deleted old snapshot: {snapshot['SnapshotId']}")
except ClientError as e:
print(f"Error deleting snapshot: {e}")
if __name__ == "__main__":
aws = AWSManager()
# List running instances
print("Running Instances:")
for instance in aws.list_instances([{'Name': 'instance-state-name', 'Values': ['running']}]):
print(f" {instance['name']}: {instance['id']} ({instance['public_ip']})")
Automatización de Despliegue
5. Desplegador de Aplicaciones
#!/usr/bin/env python3
"""
app_deployer.py - Despliegue automatizado de aplicaciones
"""
from fabric import Connection, Config
from pathlib import Path
import sys
class AppDeployer:
def __init__(self, hosts, app_name, app_dir="/opt/app"):
self.hosts = hosts
self.app_name = app_name
self.app_dir = app_dir
def deploy_to_host(self, host, repo_url, branch="main"):
"""Deploy application to a single host"""
try:
conn = Connection(host)
print(f"Deploying to {host}...")
# Backup current version
conn.run(f"tar -czf /tmp/{self.app_name}-backup-$(date +%Y%m%d_%H%M%S).tar.gz {self.app_dir} || true")
# Pull latest code
with conn.cd(self.app_dir):
result = conn.run("git pull origin {branch}", warn=True)
if result.failed:
print(f"Git pull failed, cloning repository...")
conn.run(f"rm -rf {self.app_dir}")
conn.run(f"git clone -b {branch} {repo_url} {self.app_dir}")
# Install dependencies
with conn.cd(self.app_dir):
conn.run("npm install --production")
# Restart service
conn.sudo(f"systemctl restart {self.app_name}")
# Verify deployment
result = conn.run(f"systemctl is-active {self.app_name}", warn=True)
if result.ok:
print(f"✓ Deployment to {host} successful")
return True
else:
print(f"✗ Deployment to {host} failed")
return False
except Exception as e:
print(f"Error deploying to {host}: {e}")
return False
def deploy_all(self, repo_url, branch="main"):
"""Deploy to all hosts"""
results = {}
for host in self.hosts:
results[host] = self.deploy_to_host(host, repo_url, branch)
return results
if __name__ == "__main__":
deployer = AppDeployer(
hosts=['web1.example.com', 'web2.example.com'],
app_name='myapp'
)
results = deployer.deploy_all('https://github.com/company/app.git')
if all(results.values()):
print("\n✓ Deployment successful on all hosts")
sys.exit(0)
else:
print("\n✗ Deployment failed on some hosts")
sys.exit(1)
Monitoreo y Alertas
6. Monitor de Servicios
#!/usr/bin/env python3
"""
service_monitor.py - Monitorear servicios y enviar alertas
"""
import subprocess
import requests
import time
from datetime import datetime
class ServiceMonitor:
def __init__(self):
self.failures = {}
def check_systemd_service(self, service_name):
"""Check if systemd service is running"""
try:
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
text=True
)
return result.returncode == 0
except Exception as e:
print(f"Error checking {service_name}: {e}")
return False
def check_http_endpoint(self, url, expected_status=200, timeout=10):
"""Check if HTTP endpoint is responding"""
try:
response = requests.get(url, timeout=timeout)
return response.status_code == expected_status
except Exception as e:
print(f"Error checking {url}: {e}")
return False
def check_port(self, host, port, timeout=5):
"""Check if port is open"""
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except Exception as e:
print(f"Error checking {host}:{port}: {e}")
return False
def send_alert(self, service, message):
"""Send alert notification"""
# Slack webhook example
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
payload = {
"text": f"🚨 Service Alert: {service}",
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": message}
}
]
}
try:
requests.post(webhook_url, json=payload)
except Exception as e:
print(f"Error sending alert: {e}")
def monitor_services(self, checks, interval=60):
"""Continuously monitor services"""
while True:
for check_name, check_config in checks.items():
status = False
if check_config['type'] == 'systemd':
status = self.check_systemd_service(check_config['service'])
elif check_config['type'] == 'http':
status = self.check_http_endpoint(check_config['url'])
elif check_config['type'] == 'port':
status = self.check_port(check_config['host'], check_config['port'])
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if not status:
if check_name not in self.failures:
self.failures[check_name] = timestamp
self.send_alert(check_name, f"{check_name} is down since {timestamp}")
print(f"[{timestamp}] ✗ {check_name} - FAILED")
else:
if check_name in self.failures:
downtime = self.failures.pop(check_name)
self.send_alert(check_name, f"{check_name} is back up (was down since {downtime})")
print(f"[{timestamp}] ✓ {check_name} - OK")
time.sleep(interval)
if __name__ == "__main__":
monitor = ServiceMonitor()
checks = {
'nginx': {'type': 'systemd', 'service': 'nginx'},
'website': {'type': 'http', 'url': 'https://example.com'},
'database': {'type': 'port', 'host': 'localhost', 'port': 5432}
}
monitor.monitor_services(checks)
Mejores Prácticas
Manejo de Errores
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/automation.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
try:
# Your code
pass
except Exception as e:
logger.error(f"Error occurred: {e}", exc_info=True)
Gestión de Configuración
import os
from dotenv import load_dotenv
load_dotenv()
config = {
'api_key': os.getenv('API_KEY'),
'db_host': os.getenv('DB_HOST', 'localhost'),
'db_port': int(os.getenv('DB_PORT', 5432))
}
Type Hints
from typing import List, Dict, Optional
def process_servers(servers: List[str], config: Dict[str, str]) -> Optional[bool]:
"""Process list of servers with given configuration"""
pass
Conclusión
Python es una herramienta esencial para la automatización moderna de DevOps. Estos ejemplos prácticos demuestran cómo automatizar la gestión de servidores, operaciones en la nube, monitoreo y tareas de despliegue utilizando las potentes librerías y características de Python.
Puntos clave:
- Usar entornos virtuales para aislamiento de dependencias
- Implementar manejo completo de errores y registro
- Aprovechar el extenso ecosistema de librerías de Python
- Seguir las guías de estilo PEP 8
- Escribir código testeable y mantenible
- Usar type hints para mejor documentación del código
- Implementar prácticas de seguridad adecuadas
Continúa desarrollando tus habilidades de automatización con Python adaptando estos scripts a tus necesidades específicas de infraestructura y explorando librerías adicionales como Celery para colas de tareas, Flask/FastAPI para APIs web, y más SDKs de proveedores de nube.


