Automatización con Python: Scripts Útiles para DevOps y Administración de Sistemas

Introducción

Python se ha convertido en el lenguaje de facto para automatización DevOps, gestión de infraestructura y tareas de administración de sistemas. Su legibilidad, extenso ecosistema de librerías y compatibilidad multiplataforma lo hacen ideal para construir herramientas de automatización robustas. Mientras que Bash sobresale en scripts simples, Python brilla cuando necesitas lógica compleja, procesamiento de datos, integraciones con APIs o bases de código mantenibles.

Esta guía completa presenta scripts de Python listos para producción para tareas comunes de DevOps, cubriendo gestión de servidores, automatización en la nube, monitoreo y orquestación de infraestructura.

Requisitos Previos

  • Python 3.7+ instalado
  • Conocimiento básico de programación Python
  • Comprensión de sistemas Linux/Unix
  • Familiaridad con herramientas de línea de comandos
  • pip para gestión de paquetes

Configuración Esencial de Python

Configuración de Entorno Virtual

# Crear entorno virtual
python3 -m venv ~/automation-env
source ~/automation-env/bin/activate

# Instalar librerías comunes
pip install requests paramiko boto3 fabric psutil schedule

Librerías Requeridas

# requirements.txt
requests>=2.31.0
paramiko>=3.3.1
boto3>=1.28.0
fabric>=3.2.0
psutil>=5.9.5
schedule>=1.2.0
python-dotenv>=1.0.0
pyyaml>=6.0.1
jinja2>=3.1.2

Scripts de Gestión de Servidores

1. Monitor de Salud del Sistema

#!/usr/bin/env python3
"""
system_monitor.py - Monitoreo completo de salud del sistema
"""

import psutil
import smtplib
from email.mime.text import MIMEText
from datetime import datetime

class SystemMonitor:
    def __init__(self, cpu_threshold=80, memory_threshold=85, disk_threshold=90):
        self.cpu_threshold = cpu_threshold
        self.memory_threshold = memory_threshold
        self.disk_threshold = disk_threshold
        self.alerts = []

    def check_cpu(self):
        cpu_percent = psutil.cpu_percent(interval=1)
        if cpu_percent > self.cpu_threshold:
            self.alerts.append(f"CPU usage is {cpu_percent}%")
        return cpu_percent

    def check_memory(self):
        memory = psutil.virtual_memory()
        if memory.percent > self.memory_threshold:
            self.alerts.append(f"Memory usage is {memory.percent}%")
        return memory.percent

    def check_disk(self):
        disk = psutil.disk_usage('/')
        if disk.percent > self.disk_threshold:
            self.alerts.append(f"Disk usage is {disk.percent}%")
        return disk.percent

    def get_top_processes(self, n=5):
        processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                processes.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:n]

    def generate_report(self):
        report = f"""
System Health Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
================================================================

Resource Usage:
- CPU: {self.check_cpu()}%
- Memory: {self.check_memory()}%
- Disk: {self.check_disk()}%

Top Processes by CPU:
"""
        for proc in self.get_top_processes():
            report += f"- {proc['name']} (PID {proc['pid']}): {proc['cpu_percent']}%\n"

        if self.alerts:
            report += "\n⚠️  ALERTS:\n"
            for alert in self.alerts:
                report += f"- {alert}\n"

        return report

    def send_alert(self, email_to, email_from, smtp_server):
        if not self.alerts:
            return

        msg = MIMEText(self.generate_report())
        msg['Subject'] = 'System Health Alert'
        msg['From'] = email_from
        msg['To'] = email_to

        with smtplib.SMTP(smtp_server) as server:
            server.send_message(msg)

if __name__ == "__main__":
    monitor = SystemMonitor()
    print(monitor.generate_report())

    if monitor.alerts:
        # monitor.send_alert('[email protected]', '[email protected]', 'localhost')
        pass

2. Script de Respaldo Automatizado

#!/usr/bin/env python3
"""
backup_manager.py - Solución completa de respaldo
"""

import os
import tarfile
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import subprocess
import boto3

class BackupManager:
    def __init__(self, backup_dir="/backup", retention_days=7):
        self.backup_dir = Path(backup_dir)
        self.retention_days = retention_days
        self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.backup_dir.mkdir(parents=True, exist_ok=True)

    def backup_directory(self, source_dir, name=None):
        """Create compressed backup of directory"""
        source = Path(source_dir)
        if not source.exists():
            raise FileNotFoundError(f"Source directory not found: {source_dir}")

        backup_name = name or source.name
        archive_path = self.backup_dir / f"{backup_name}_{self.timestamp}.tar.gz"

        print(f"Backing up {source_dir} to {archive_path}")

        with tarfile.open(archive_path, "w:gz") as tar:
            tar.add(source_dir, arcname=backup_name)

        return archive_path

    def backup_mysql(self, database=None, user="root", password=None):
        """Backup MySQL database"""
        backup_file = self.backup_dir / f"mysql_{database or 'all'}_{self.timestamp}.sql.gz"

        cmd = ["mysqldump"]
        if user:
            cmd.extend(["-u", user])
        if password:
            cmd.extend([f"--password={password}"])

        if database:
            cmd.append(database)
        else:
            cmd.append("--all-databases")

        print(f"Backing up MySQL database(s) to {backup_file}")

        with gzip.open(backup_file, 'wb') as f:
            subprocess.run(cmd, stdout=f, check=True)

        return backup_file

    def backup_postgresql(self, database=None):
        """Backup PostgreSQL database"""
        backup_file = self.backup_dir / f"postgres_{database or 'all'}_{self.timestamp}.sql.gz"

        cmd = ["sudo", "-u", "postgres", "pg_dump" if database else "pg_dumpall"]
        if database:
            cmd.append(database)

        print(f"Backing up PostgreSQL to {backup_file}")

        with gzip.open(backup_file, 'wb') as f:
            subprocess.run(cmd, stdout=f, check=True)

        return backup_file

    def upload_to_s3(self, file_path, bucket, prefix=""):
        """Upload backup to S3"""
        s3 = boto3.client('s3')
        key = f"{prefix}/{Path(file_path).name}" if prefix else Path(file_path).name

        print(f"Uploading {file_path} to s3://{bucket}/{key}")
        s3.upload_file(str(file_path), bucket, key)

    def cleanup_old_backups(self):
        """Remove backups older than retention period"""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)

        for backup_file in self.backup_dir.glob("*"):
            if backup_file.stat().st_mtime < cutoff_date.timestamp():
                print(f"Removing old backup: {backup_file}")
                backup_file.unlink()

    def run_full_backup(self, config):
        """Execute full backup based on configuration"""
        backup_files = []

        # Backup directories
        for directory in config.get('directories', []):
            backup_files.append(self.backup_directory(directory))

        # Backup MySQL databases
        for db_config in config.get('mysql', []):
            backup_files.append(self.backup_mysql(**db_config))

        # Backup PostgreSQL databases
        for db in config.get('postgresql', []):
            backup_files.append(self.backup_postgresql(db))

        # Upload to S3 if configured
        if 's3' in config:
            for backup_file in backup_files:
                self.upload_to_s3(backup_file, **config['s3'])

        # Cleanup old backups
        self.cleanup_old_backups()

        return backup_files

if __name__ == "__main__":
    backup_config = {
        'directories': ['/var/www', '/etc'],
        'mysql': [
            {'database': 'production_db', 'user': 'backup', 'password': 'secret'}
        ],
        'postgresql': ['app_database'],
        's3': {
            'bucket': 'company-backups',
            'prefix': 'daily'
        }
    }

    manager = BackupManager()
    manager.run_full_backup(backup_config)

3. Analizador de Logs

#!/usr/bin/env python3
"""
log_analyzer.py - Analizar y reportar sobre archivos de log
"""

import re
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = Path(log_file)
        self.entries = []

    def parse_nginx_log(self):
        """Parse Nginx access log"""
        pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"'

        with open(self.log_file) as f:
            for line in f:
                match = re.match(pattern, line)
                if match:
                    self.entries.append({
                        'ip': match.group(1),
                        'timestamp': match.group(2),
                        'request': match.group(3),
                        'status': int(match.group(4)),
                        'bytes': int(match.group(5)),
                        'referrer': match.group(6),
                        'user_agent': match.group(7)
                    })

    def get_top_ips(self, n=10):
        """Get top N IP addresses by request count"""
        ips = [entry['ip'] for entry in self.entries]
        return Counter(ips).most_common(n)

    def get_status_distribution(self):
        """Get HTTP status code distribution"""
        statuses = [entry['status'] for entry in self.entries]
        return dict(Counter(statuses))

    def get_error_requests(self):
        """Get all requests with 4xx or 5xx status codes"""
        return [entry for entry in self.entries if entry['status'] >= 400]

    def get_bandwidth_usage(self):
        """Calculate total bandwidth used"""
        total_bytes = sum(entry['bytes'] for entry in self.entries)
        return {
            'bytes': total_bytes,
            'kb': total_bytes / 1024,
            'mb': total_bytes / (1024 ** 2),
            'gb': total_bytes / (1024 ** 3)
        }

    def generate_report(self):
        """Generate comprehensive log analysis report"""
        report = f"""
Log Analysis Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
File: {self.log_file}
Total Requests: {len(self.entries)}
================================================================

Top 10 IP Addresses:
"""
        for ip, count in self.get_top_ips():
            report += f"  {ip}: {count} requests\n"

        report += "\nHTTP Status Distribution:\n"
        for status, count in sorted(self.get_status_distribution().items()):
            report += f"  {status}: {count}\n"

        bandwidth = self.get_bandwidth_usage()
        report += f"\nBandwidth Usage:\n"
        report += f"  Total: {bandwidth['gb']:.2f} GB ({bandwidth['mb']:.2f} MB)\n"

        errors = self.get_error_requests()
        report += f"\nError Requests: {len(errors)}\n"
        if errors:
            report += "Sample errors:\n"
            for error in errors[:5]:
                report += f"  [{error['status']}] {error['request']}\n"

        return report

if __name__ == "__main__":
    analyzer = LogAnalyzer('/var/log/nginx/access.log')
    analyzer.parse_nginx_log()
    print(analyzer.generate_report())

Scripts de Automatización en la Nube

4. Gestor de AWS EC2

#!/usr/bin/env python3
"""
aws_manager.py - Gestionar instancias AWS EC2
"""

import boto3
from botocore.exceptions import ClientError

class AWSManager:
    def __init__(self, region='us-east-1'):
        self.ec2 = boto3.resource('ec2', region_name=region)
        self.client = boto3.client('ec2', region_name=region)

    def list_instances(self, filters=None):
        """List all EC2 instances"""
        instances = []
        for instance in self.ec2.instances.filter(Filters=filters or []):
            instances.append({
                'id': instance.id,
                'type': instance.instance_type,
                'state': instance.state['Name'],
                'public_ip': instance.public_ip_address,
                'private_ip': instance.private_ip_address,
                'name': next((tag['Value'] for tag in instance.tags or [] if tag['Key'] == 'Name'), None)
            })
        return instances

    def start_instances(self, instance_ids):
        """Start EC2 instances"""
        try:
            self.client.start_instances(InstanceIds=instance_ids)
            print(f"Started instances: {', '.join(instance_ids)}")
        except ClientError as e:
            print(f"Error starting instances: {e}")

    def stop_instances(self, instance_ids):
        """Stop EC2 instances"""
        try:
            self.client.stop_instances(InstanceIds=instance_ids)
            print(f"Stopped instances: {', '.join(instance_ids)}")
        except ClientError as e:
            print(f"Error stopping instances: {e}")

    def create_snapshot(self, volume_id, description):
        """Create EBS snapshot"""
        try:
            snapshot = self.client.create_snapshot(
                VolumeId=volume_id,
                Description=description
            )
            print(f"Created snapshot: {snapshot['SnapshotId']}")
            return snapshot['SnapshotId']
        except ClientError as e:
            print(f"Error creating snapshot: {e}")

    def cleanup_old_snapshots(self, days=30):
        """Delete snapshots older than specified days"""
        from datetime import datetime, timedelta

        cutoff_date = datetime.now() - timedelta(days=days)

        snapshots = self.client.describe_snapshots(OwnerIds=['self'])['Snapshots']

        for snapshot in snapshots:
            start_time = snapshot['StartTime'].replace(tzinfo=None)
            if start_time < cutoff_date:
                try:
                    self.client.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
                    print(f"Deleted old snapshot: {snapshot['SnapshotId']}")
                except ClientError as e:
                    print(f"Error deleting snapshot: {e}")

if __name__ == "__main__":
    aws = AWSManager()

    # List running instances
    print("Running Instances:")
    for instance in aws.list_instances([{'Name': 'instance-state-name', 'Values': ['running']}]):
        print(f"  {instance['name']}: {instance['id']} ({instance['public_ip']})")

Automatización de Despliegue

5. Desplegador de Aplicaciones

#!/usr/bin/env python3
"""
app_deployer.py - Despliegue automatizado de aplicaciones
"""

from fabric import Connection, Config
from pathlib import Path
import sys

class AppDeployer:
    def __init__(self, hosts, app_name, app_dir="/opt/app"):
        self.hosts = hosts
        self.app_name = app_name
        self.app_dir = app_dir

    def deploy_to_host(self, host, repo_url, branch="main"):
        """Deploy application to a single host"""
        try:
            conn = Connection(host)

            print(f"Deploying to {host}...")

            # Backup current version
            conn.run(f"tar -czf /tmp/{self.app_name}-backup-$(date +%Y%m%d_%H%M%S).tar.gz {self.app_dir} || true")

            # Pull latest code
            with conn.cd(self.app_dir):
                result = conn.run("git pull origin {branch}", warn=True)

                if result.failed:
                    print(f"Git pull failed, cloning repository...")
                    conn.run(f"rm -rf {self.app_dir}")
                    conn.run(f"git clone -b {branch} {repo_url} {self.app_dir}")

            # Install dependencies
            with conn.cd(self.app_dir):
                conn.run("npm install --production")

            # Restart service
            conn.sudo(f"systemctl restart {self.app_name}")

            # Verify deployment
            result = conn.run(f"systemctl is-active {self.app_name}", warn=True)

            if result.ok:
                print(f"✓ Deployment to {host} successful")
                return True
            else:
                print(f"✗ Deployment to {host} failed")
                return False

        except Exception as e:
            print(f"Error deploying to {host}: {e}")
            return False

    def deploy_all(self, repo_url, branch="main"):
        """Deploy to all hosts"""
        results = {}
        for host in self.hosts:
            results[host] = self.deploy_to_host(host, repo_url, branch)

        return results

if __name__ == "__main__":
    deployer = AppDeployer(
        hosts=['web1.example.com', 'web2.example.com'],
        app_name='myapp'
    )

    results = deployer.deploy_all('https://github.com/company/app.git')

    if all(results.values()):
        print("\n✓ Deployment successful on all hosts")
        sys.exit(0)
    else:
        print("\n✗ Deployment failed on some hosts")
        sys.exit(1)

Monitoreo y Alertas

6. Monitor de Servicios

#!/usr/bin/env python3
"""
service_monitor.py - Monitorear servicios y enviar alertas
"""

import subprocess
import requests
import time
from datetime import datetime

class ServiceMonitor:
    def __init__(self):
        self.failures = {}

    def check_systemd_service(self, service_name):
        """Check if systemd service is running"""
        try:
            result = subprocess.run(
                ['systemctl', 'is-active', service_name],
                capture_output=True,
                text=True
            )
            return result.returncode == 0
        except Exception as e:
            print(f"Error checking {service_name}: {e}")
            return False

    def check_http_endpoint(self, url, expected_status=200, timeout=10):
        """Check if HTTP endpoint is responding"""
        try:
            response = requests.get(url, timeout=timeout)
            return response.status_code == expected_status
        except Exception as e:
            print(f"Error checking {url}: {e}")
            return False

    def check_port(self, host, port, timeout=5):
        """Check if port is open"""
        import socket
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        except Exception as e:
            print(f"Error checking {host}:{port}: {e}")
            return False

    def send_alert(self, service, message):
        """Send alert notification"""
        # Slack webhook example
        webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

        payload = {
            "text": f"🚨 Service Alert: {service}",
            "blocks": [
                {
                    "type": "section",
                    "text": {"type": "mrkdwn", "text": message}
                }
            ]
        }

        try:
            requests.post(webhook_url, json=payload)
        except Exception as e:
            print(f"Error sending alert: {e}")

    def monitor_services(self, checks, interval=60):
        """Continuously monitor services"""
        while True:
            for check_name, check_config in checks.items():
                status = False

                if check_config['type'] == 'systemd':
                    status = self.check_systemd_service(check_config['service'])
                elif check_config['type'] == 'http':
                    status = self.check_http_endpoint(check_config['url'])
                elif check_config['type'] == 'port':
                    status = self.check_port(check_config['host'], check_config['port'])

                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                if not status:
                    if check_name not in self.failures:
                        self.failures[check_name] = timestamp
                        self.send_alert(check_name, f"{check_name} is down since {timestamp}")
                    print(f"[{timestamp}] ✗ {check_name} - FAILED")
                else:
                    if check_name in self.failures:
                        downtime = self.failures.pop(check_name)
                        self.send_alert(check_name, f"{check_name} is back up (was down since {downtime})")
                    print(f"[{timestamp}] ✓ {check_name} - OK")

            time.sleep(interval)

if __name__ == "__main__":
    monitor = ServiceMonitor()

    checks = {
        'nginx': {'type': 'systemd', 'service': 'nginx'},
        'website': {'type': 'http', 'url': 'https://example.com'},
        'database': {'type': 'port', 'host': 'localhost', 'port': 5432}
    }

    monitor.monitor_services(checks)

Mejores Prácticas

Manejo de Errores

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/automation.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

try:
    # Your code
    pass
except Exception as e:
    logger.error(f"Error occurred: {e}", exc_info=True)

Gestión de Configuración

import os
from dotenv import load_dotenv

load_dotenv()

config = {
    'api_key': os.getenv('API_KEY'),
    'db_host': os.getenv('DB_HOST', 'localhost'),
    'db_port': int(os.getenv('DB_PORT', 5432))
}

Type Hints

from typing import List, Dict, Optional

def process_servers(servers: List[str], config: Dict[str, str]) -> Optional[bool]:
    """Process list of servers with given configuration"""
    pass

Conclusión

Python es una herramienta esencial para la automatización moderna de DevOps. Estos ejemplos prácticos demuestran cómo automatizar la gestión de servidores, operaciones en la nube, monitoreo y tareas de despliegue utilizando las potentes librerías y características de Python.

Puntos clave:

  • Usar entornos virtuales para aislamiento de dependencias
  • Implementar manejo completo de errores y registro
  • Aprovechar el extenso ecosistema de librerías de Python
  • Seguir las guías de estilo PEP 8
  • Escribir código testeable y mantenible
  • Usar type hints para mejor documentación del código
  • Implementar prácticas de seguridad adecuadas

Continúa desarrollando tus habilidades de automatización con Python adaptando estos scripts a tus necesidades específicas de infraestructura y explorando librerías adicionales como Celery para colas de tareas, Flask/FastAPI para APIs web, y más SDKs de proveedores de nube.