Automation with Python: Useful Scripts for DevOps and System Administration

Introduction

Python has become the de facto language for DevOps automation, infrastructure management, and system administration tasks. Its readability, extensive library ecosystem, and cross-platform compatibility make it ideal for building robust automation tools. While Bash excels at simple scripts, Python shines when you need complex logic, data processing, API integrations, or maintainable codebases.

This comprehensive guide presents production-ready Python scripts for common DevOps tasks, covering server management, cloud automation, monitoring, and infrastructure orchestration.

Prerequisites

  • Python 3.7+ installed
  • Basic Python programming knowledge
  • Understanding of Linux/Unix systems
  • Familiarity with command-line tools
  • pip for package management

Essential Python Setup

Virtual Environment Setup

# Create virtual environment
python3 -m venv ~/automation-env
source ~/automation-env/bin/activate

# Install common libraries
pip install requests paramiko boto3 fabric psutil schedule

Required Libraries

# requirements.txt
requests>=2.31.0
paramiko>=3.3.1
boto3>=1.28.0
fabric>=3.2.0
psutil>=5.9.5
schedule>=1.2.0
python-dotenv>=1.0.0
pyyaml>=6.0.1
jinja2>=3.1.2

Server Management Scripts

1. System Health Monitor

#!/usr/bin/env python3
"""
system_monitor.py - Comprehensive system health monitoring
"""

import psutil
import smtplib
from email.mime.text import MIMEText
from datetime import datetime

class SystemMonitor:
    def __init__(self, cpu_threshold=80, memory_threshold=85, disk_threshold=90):
        self.cpu_threshold = cpu_threshold
        self.memory_threshold = memory_threshold
        self.disk_threshold = disk_threshold
        self.alerts = []

    def check_cpu(self):
        cpu_percent = psutil.cpu_percent(interval=1)
        if cpu_percent > self.cpu_threshold:
            self.alerts.append(f"CPU usage is {cpu_percent}%")
        return cpu_percent

    def check_memory(self):
        memory = psutil.virtual_memory()
        if memory.percent > self.memory_threshold:
            self.alerts.append(f"Memory usage is {memory.percent}%")
        return memory.percent

    def check_disk(self):
        disk = psutil.disk_usage('/')
        if disk.percent > self.disk_threshold:
            self.alerts.append(f"Disk usage is {disk.percent}%")
        return disk.percent

    def get_top_processes(self, n=5):
        processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                processes.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:n]

    def generate_report(self):
        report = f"""
System Health Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
================================================================

Resource Usage:
- CPU: {self.check_cpu()}%
- Memory: {self.check_memory()}%
- Disk: {self.check_disk()}%

Top Processes by CPU:
"""
        for proc in self.get_top_processes():
            report += f"- {proc['name']} (PID {proc['pid']}): {proc['cpu_percent']}%\n"

        if self.alerts:
            report += "\n⚠️  ALERTS:\n"
            for alert in self.alerts:
                report += f"- {alert}\n"

        return report

    def send_alert(self, email_to, email_from, smtp_server):
        if not self.alerts:
            return

        msg = MIMEText(self.generate_report())
        msg['Subject'] = 'System Health Alert'
        msg['From'] = email_from
        msg['To'] = email_to

        with smtplib.SMTP(smtp_server) as server:
            server.send_message(msg)

if __name__ == "__main__":
    monitor = SystemMonitor()
    print(monitor.generate_report())

    if monitor.alerts:
        # monitor.send_alert('[email protected]', '[email protected]', 'localhost')
        pass

2. Automated Backup Script

#!/usr/bin/env python3
"""
backup_manager.py - Comprehensive backup solution
"""

import os
import tarfile
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import subprocess
import boto3

class BackupManager:
    def __init__(self, backup_dir="/backup", retention_days=7):
        self.backup_dir = Path(backup_dir)
        self.retention_days = retention_days
        self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.backup_dir.mkdir(parents=True, exist_ok=True)

    def backup_directory(self, source_dir, name=None):
        """Create compressed backup of directory"""
        source = Path(source_dir)
        if not source.exists():
            raise FileNotFoundError(f"Source directory not found: {source_dir}")

        backup_name = name or source.name
        archive_path = self.backup_dir / f"{backup_name}_{self.timestamp}.tar.gz"

        print(f"Backing up {source_dir} to {archive_path}")

        with tarfile.open(archive_path, "w:gz") as tar:
            tar.add(source_dir, arcname=backup_name)

        return archive_path

    def backup_mysql(self, database=None, user="root", password=None):
        """Backup MySQL database"""
        backup_file = self.backup_dir / f"mysql_{database or 'all'}_{self.timestamp}.sql.gz"

        cmd = ["mysqldump"]
        if user:
            cmd.extend(["-u", user])
        if password:
            cmd.extend([f"--password={password}"])

        if database:
            cmd.append(database)
        else:
            cmd.append("--all-databases")

        print(f"Backing up MySQL database(s) to {backup_file}")

        with gzip.open(backup_file, 'wb') as f:
            subprocess.run(cmd, stdout=f, check=True)

        return backup_file

    def backup_postgresql(self, database=None):
        """Backup PostgreSQL database"""
        backup_file = self.backup_dir / f"postgres_{database or 'all'}_{self.timestamp}.sql.gz"

        cmd = ["sudo", "-u", "postgres", "pg_dump" if database else "pg_dumpall"]
        if database:
            cmd.append(database)

        print(f"Backing up PostgreSQL to {backup_file}")

        with gzip.open(backup_file, 'wb') as f:
            subprocess.run(cmd, stdout=f, check=True)

        return backup_file

    def upload_to_s3(self, file_path, bucket, prefix=""):
        """Upload backup to S3"""
        s3 = boto3.client('s3')
        key = f"{prefix}/{Path(file_path).name}" if prefix else Path(file_path).name

        print(f"Uploading {file_path} to s3://{bucket}/{key}")
        s3.upload_file(str(file_path), bucket, key)

    def cleanup_old_backups(self):
        """Remove backups older than retention period"""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)

        for backup_file in self.backup_dir.glob("*"):
            if backup_file.stat().st_mtime < cutoff_date.timestamp():
                print(f"Removing old backup: {backup_file}")
                backup_file.unlink()

    def run_full_backup(self, config):
        """Execute full backup based on configuration"""
        backup_files = []

        # Backup directories
        for directory in config.get('directories', []):
            backup_files.append(self.backup_directory(directory))

        # Backup MySQL databases
        for db_config in config.get('mysql', []):
            backup_files.append(self.backup_mysql(**db_config))

        # Backup PostgreSQL databases
        for db in config.get('postgresql', []):
            backup_files.append(self.backup_postgresql(db))

        # Upload to S3 if configured
        if 's3' in config:
            for backup_file in backup_files:
                self.upload_to_s3(backup_file, **config['s3'])

        # Cleanup old backups
        self.cleanup_old_backups()

        return backup_files

if __name__ == "__main__":
    backup_config = {
        'directories': ['/var/www', '/etc'],
        'mysql': [
            {'database': 'production_db', 'user': 'backup', 'password': 'secret'}
        ],
        'postgresql': ['app_database'],
        's3': {
            'bucket': 'company-backups',
            'prefix': 'daily'
        }
    }

    manager = BackupManager()
    manager.run_full_backup(backup_config)

3. Log Analyzer

#!/usr/bin/env python3
"""
log_analyzer.py - Analyze and report on log files
"""

import re
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = Path(log_file)
        self.entries = []

    def parse_nginx_log(self):
        """Parse Nginx access log"""
        pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"'

        with open(self.log_file) as f:
            for line in f:
                match = re.match(pattern, line)
                if match:
                    self.entries.append({
                        'ip': match.group(1),
                        'timestamp': match.group(2),
                        'request': match.group(3),
                        'status': int(match.group(4)),
                        'bytes': int(match.group(5)),
                        'referrer': match.group(6),
                        'user_agent': match.group(7)
                    })

    def get_top_ips(self, n=10):
        """Get top N IP addresses by request count"""
        ips = [entry['ip'] for entry in self.entries]
        return Counter(ips).most_common(n)

    def get_status_distribution(self):
        """Get HTTP status code distribution"""
        statuses = [entry['status'] for entry in self.entries]
        return dict(Counter(statuses))

    def get_error_requests(self):
        """Get all requests with 4xx or 5xx status codes"""
        return [entry for entry in self.entries if entry['status'] >= 400]

    def get_bandwidth_usage(self):
        """Calculate total bandwidth used"""
        total_bytes = sum(entry['bytes'] for entry in self.entries)
        return {
            'bytes': total_bytes,
            'kb': total_bytes / 1024,
            'mb': total_bytes / (1024 ** 2),
            'gb': total_bytes / (1024 ** 3)
        }

    def generate_report(self):
        """Generate comprehensive log analysis report"""
        report = f"""
Log Analysis Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
File: {self.log_file}
Total Requests: {len(self.entries)}
================================================================

Top 10 IP Addresses:
"""
        for ip, count in self.get_top_ips():
            report += f"  {ip}: {count} requests\n"

        report += "\nHTTP Status Distribution:\n"
        for status, count in sorted(self.get_status_distribution().items()):
            report += f"  {status}: {count}\n"

        bandwidth = self.get_bandwidth_usage()
        report += f"\nBandwidth Usage:\n"
        report += f"  Total: {bandwidth['gb']:.2f} GB ({bandwidth['mb']:.2f} MB)\n"

        errors = self.get_error_requests()
        report += f"\nError Requests: {len(errors)}\n"
        if errors:
            report += "Sample errors:\n"
            for error in errors[:5]:
                report += f"  [{error['status']}] {error['request']}\n"

        return report

if __name__ == "__main__":
    analyzer = LogAnalyzer('/var/log/nginx/access.log')
    analyzer.parse_nginx_log()
    print(analyzer.generate_report())

Cloud Automation Scripts

4. AWS EC2 Manager

#!/usr/bin/env python3
"""
aws_manager.py - Manage AWS EC2 instances
"""

import boto3
from botocore.exceptions import ClientError

class AWSManager:
    def __init__(self, region='us-east-1'):
        self.ec2 = boto3.resource('ec2', region_name=region)
        self.client = boto3.client('ec2', region_name=region)

    def list_instances(self, filters=None):
        """List all EC2 instances"""
        instances = []
        for instance in self.ec2.instances.filter(Filters=filters or []):
            instances.append({
                'id': instance.id,
                'type': instance.instance_type,
                'state': instance.state['Name'],
                'public_ip': instance.public_ip_address,
                'private_ip': instance.private_ip_address,
                'name': next((tag['Value'] for tag in instance.tags or [] if tag['Key'] == 'Name'), None)
            })
        return instances

    def start_instances(self, instance_ids):
        """Start EC2 instances"""
        try:
            self.client.start_instances(InstanceIds=instance_ids)
            print(f"Started instances: {', '.join(instance_ids)}")
        except ClientError as e:
            print(f"Error starting instances: {e}")

    def stop_instances(self, instance_ids):
        """Stop EC2 instances"""
        try:
            self.client.stop_instances(InstanceIds=instance_ids)
            print(f"Stopped instances: {', '.join(instance_ids)}")
        except ClientError as e:
            print(f"Error stopping instances: {e}")

    def create_snapshot(self, volume_id, description):
        """Create EBS snapshot"""
        try:
            snapshot = self.client.create_snapshot(
                VolumeId=volume_id,
                Description=description
            )
            print(f"Created snapshot: {snapshot['SnapshotId']}")
            return snapshot['SnapshotId']
        except ClientError as e:
            print(f"Error creating snapshot: {e}")

    def cleanup_old_snapshots(self, days=30):
        """Delete snapshots older than specified days"""
        from datetime import datetime, timedelta

        cutoff_date = datetime.now() - timedelta(days=days)

        snapshots = self.client.describe_snapshots(OwnerIds=['self'])['Snapshots']

        for snapshot in snapshots:
            start_time = snapshot['StartTime'].replace(tzinfo=None)
            if start_time < cutoff_date:
                try:
                    self.client.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
                    print(f"Deleted old snapshot: {snapshot['SnapshotId']}")
                except ClientError as e:
                    print(f"Error deleting snapshot: {e}")

if __name__ == "__main__":
    aws = AWSManager()

    # List running instances
    print("Running Instances:")
    for instance in aws.list_instances([{'Name': 'instance-state-name', 'Values': ['running']}]):
        print(f"  {instance['name']}: {instance['id']} ({instance['public_ip']})")

Deployment Automation

5. Application Deployer

#!/usr/bin/env python3
"""
app_deployer.py - Automated application deployment
"""

from fabric import Connection, Config
from pathlib import Path
import sys

class AppDeployer:
    def __init__(self, hosts, app_name, app_dir="/opt/app"):
        self.hosts = hosts
        self.app_name = app_name
        self.app_dir = app_dir

    def deploy_to_host(self, host, repo_url, branch="main"):
        """Deploy application to a single host"""
        try:
            conn = Connection(host)

            print(f"Deploying to {host}...")

            # Backup current version
            conn.run(f"tar -czf /tmp/{self.app_name}-backup-$(date +%Y%m%d_%H%M%S).tar.gz {self.app_dir} || true")

            # Pull latest code
            with conn.cd(self.app_dir):
                result = conn.run("git pull origin {branch}", warn=True)

                if result.failed:
                    print(f"Git pull failed, cloning repository...")
                    conn.run(f"rm -rf {self.app_dir}")
                    conn.run(f"git clone -b {branch} {repo_url} {self.app_dir}")

            # Install dependencies
            with conn.cd(self.app_dir):
                conn.run("npm install --production")

            # Restart service
            conn.sudo(f"systemctl restart {self.app_name}")

            # Verify deployment
            result = conn.run(f"systemctl is-active {self.app_name}", warn=True)

            if result.ok:
                print(f"✓ Deployment to {host} successful")
                return True
            else:
                print(f"✗ Deployment to {host} failed")
                return False

        except Exception as e:
            print(f"Error deploying to {host}: {e}")
            return False

    def deploy_all(self, repo_url, branch="main"):
        """Deploy to all hosts"""
        results = {}
        for host in self.hosts:
            results[host] = self.deploy_to_host(host, repo_url, branch)

        return results

if __name__ == "__main__":
    deployer = AppDeployer(
        hosts=['web1.example.com', 'web2.example.com'],
        app_name='myapp'
    )

    results = deployer.deploy_all('https://github.com/company/app.git')

    if all(results.values()):
        print("\n✓ Deployment successful on all hosts")
        sys.exit(0)
    else:
        print("\n✗ Deployment failed on some hosts")
        sys.exit(1)

Monitoring and Alerting

6. Service Monitor

#!/usr/bin/env python3
"""
service_monitor.py - Monitor services and send alerts
"""

import subprocess
import requests
import time
from datetime import datetime

class ServiceMonitor:
    def __init__(self):
        self.failures = {}

    def check_systemd_service(self, service_name):
        """Check if systemd service is running"""
        try:
            result = subprocess.run(
                ['systemctl', 'is-active', service_name],
                capture_output=True,
                text=True
            )
            return result.returncode == 0
        except Exception as e:
            print(f"Error checking {service_name}: {e}")
            return False

    def check_http_endpoint(self, url, expected_status=200, timeout=10):
        """Check if HTTP endpoint is responding"""
        try:
            response = requests.get(url, timeout=timeout)
            return response.status_code == expected_status
        except Exception as e:
            print(f"Error checking {url}: {e}")
            return False

    def check_port(self, host, port, timeout=5):
        """Check if port is open"""
        import socket
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        except Exception as e:
            print(f"Error checking {host}:{port}: {e}")
            return False

    def send_alert(self, service, message):
        """Send alert notification"""
        # Slack webhook example
        webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

        payload = {
            "text": f"🚨 Service Alert: {service}",
            "blocks": [
                {
                    "type": "section",
                    "text": {"type": "mrkdwn", "text": message}
                }
            ]
        }

        try:
            requests.post(webhook_url, json=payload)
        except Exception as e:
            print(f"Error sending alert: {e}")

    def monitor_services(self, checks, interval=60):
        """Continuously monitor services"""
        while True:
            for check_name, check_config in checks.items():
                status = False

                if check_config['type'] == 'systemd':
                    status = self.check_systemd_service(check_config['service'])
                elif check_config['type'] == 'http':
                    status = self.check_http_endpoint(check_config['url'])
                elif check_config['type'] == 'port':
                    status = self.check_port(check_config['host'], check_config['port'])

                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                if not status:
                    if check_name not in self.failures:
                        self.failures[check_name] = timestamp
                        self.send_alert(check_name, f"{check_name} is down since {timestamp}")
                    print(f"[{timestamp}] ✗ {check_name} - FAILED")
                else:
                    if check_name in self.failures:
                        downtime = self.failures.pop(check_name)
                        self.send_alert(check_name, f"{check_name} is back up (was down since {downtime})")
                    print(f"[{timestamp}] ✓ {check_name} - OK")

            time.sleep(interval)

if __name__ == "__main__":
    monitor = ServiceMonitor()

    checks = {
        'nginx': {'type': 'systemd', 'service': 'nginx'},
        'website': {'type': 'http', 'url': 'https://example.com'},
        'database': {'type': 'port', 'host': 'localhost', 'port': 5432}
    }

    monitor.monitor_services(checks)

Best Practices

Error Handling

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/automation.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

try:
    # Your code
    pass
except Exception as e:
    logger.error(f"Error occurred: {e}", exc_info=True)

Configuration Management

import os
from dotenv import load_dotenv

load_dotenv()

config = {
    'api_key': os.getenv('API_KEY'),
    'db_host': os.getenv('DB_HOST', 'localhost'),
    'db_port': int(os.getenv('DB_PORT', 5432))
}

Type Hints

from typing import List, Dict, Optional

def process_servers(servers: List[str], config: Dict[str, str]) -> Optional[bool]:
    """Process list of servers with given configuration"""
    pass

Conclusion

Python is an essential tool for modern DevOps automation. These practical examples demonstrate how to automate server management, cloud operations, monitoring, and deployment tasks using Python's powerful libraries and features.

Key takeaways:

  • Use virtual environments for dependency isolation
  • Implement comprehensive error handling and logging
  • Leverage Python's extensive library ecosystem
  • Follow PEP 8 style guidelines
  • Write testable, maintainable code
  • Use type hints for better code documentation
  • Implement proper security practices

Continue building your Python automation skills by adapting these scripts to your specific infrastructure needs and exploring additional libraries like Celery for task queuing, Flask/FastAPI for web APIs, and more cloud provider SDKs.