27 min read
Practical Automation for Optical Network Engineers
Step-by-Step Implementation Guide with Real Code Examples, Best Practices, and Production-Ready Solutions
Your Automation Journey Starts Here
This comprehensive technical guide is designed to take you from automation basics to production-ready implementations. Every section includes hands-on code examples, detailed explanations, and practical advice based on real-world deployments.
What Makes This Guide Different
Practical Focus: Every concept is accompanied by working code you can run immediately
Real-World Context: Examples drawn from actual optical network automation scenarios
Progressive Learning: Builds from fundamentals to advanced topics systematically
Production-Ready: Includes error handling, security considerations, and best practices
Understanding Our Target Network Topology
Before diving into automation, let's understand the network infrastructure we'll be automating. This guide focuses on a realistic multi-vendor optical network spanning multiple data centers with DWDM transport, OTN switching, and IP/MPLS routing layers.
Sample Network Architecture
Scale: 3 Data Centers interconnected via DWDM optical network across India
Device Count: ~50 network elements (ROADMs, OTN switches, IP routers, optical amplifiers)
Vendors: Multi-vendor environment (representing real-world complexity)
Technologies: DWDM C-band (96 channels), 100G/400G coherent optics, OTN switching, IP/MPLS
Geographic Spread: Mumbai (West), Hyderabad (South), Delhi (North) - covering major Indian regions
Network Topology: Pan-India DWDM Optical Network
Our Automation Goals: The Complete Vision
By the end of this guide, you'll build a comprehensive automation framework that transforms how you manage this optical network. Here are the specific, measurable goals we're working toward:
Current State: Manual configuration takes 2-4 hours per device, error rate ~15%
Target State: Automated deployment in under 5 minutes per device, error rate <1%
Key Capabilities:
- Template-based configuration generation using Jinja2
- Pre-deployment validation and syntax checking
- Automatic rollback on failure
- Configuration versioning and change tracking
Current State: Manual Excel spreadsheets, often outdated, inventory audits take days
Target State: Real-time automated discovery, always-current inventory database
Key Capabilities:
- Automatic device discovery via LLDP/CDP
- Hardware and software inventory collection
- Interface and circuit mapping
- Optical channel and wavelength assignment tracking
- Integration with NetBox as source of truth
Current State: Reactive troubleshooting, 30-60 minute mean time to detect issues
Target State: Real-time monitoring with predictive alerts, <5 minute detection
Key Capabilities:
- Continuous optical power level monitoring (Tx/Rx)
- OSNR tracking and threshold alerting
- Pre-FEC and Post-FEC BER monitoring
- Chromatic dispersion and PMD tracking
- Temperature and component health monitoring
- Automated correlation of alarms across network layers
Current State: 3-5 day service turn-up time, manual path calculation
Target State: 1-hour automated provisioning, optimal path selection
Key Capabilities:
- Automated wavelength assignment and ROADM configuration
- End-to-end path computation considering OSNR budget
- OTN cross-connect automation
- IP/MPLS configuration for new circuits
- Pre and post-service validation testing
- Automatic service documentation generation
Current State: Quarterly manual audits, configuration drift undetected
Target State: Continuous compliance monitoring, instant drift detection
Key Capabilities:
- Automated configuration backup (daily)
- Configuration drift detection against golden configs
- Security policy compliance checking
- Unauthorized change detection and alerting
- Comprehensive audit logging
Current State: Separate management systems per vendor, no unified view
Target State: Single pane of glass, vendor-agnostic automation
Key Capabilities:
- Unified API layer using NETCONF/YANG with OpenConfig models
- Vendor-agnostic data collection using NAPALM
- Standardized configuration templates across vendors
- Cross-vendor service provisioning workflows
- Normalized telemetry data collection
Complete Automation Workflow: From Intent to Deployment
Measurable Success Metrics
Throughout this guide, we'll build toward achieving these specific, measurable improvements:
| Metric | Current (Manual) | Target (Automated) | Improvement |
|---|---|---|---|
| Service Provisioning Time | 3-5 days | 1-2 hours | 95% reduction |
| Configuration Errors | ~15% error rate | <1% error rate | 93% improvement |
| Mean Time to Detect (MTTD) | 30-60 minutes | <5 minutes | 90% faster |
| Configuration Backup | Weekly manual | Daily automated | 7x frequency |
| Inventory Accuracy | ~70% (outdated) | 99% (real-time) | 29% improvement |
| Compliance Audit Time | 3-5 days quarterly | Real-time continuous | 100% time savings |
| Engineer Productivity | ~30% manual tasks | ~90% value-add work | 3x productivity |
Part 1: Setting Up Your Development Environment
Step-by-Step Environment Configuration
For Linux (Ubuntu/Debian):
# Update package list sudo apt update # Install Python 3.11 (recommended version) sudo apt install python3.11 python3.11-venv python3-pip # Verify installation python3 --version pip3 --version # Install development tools sudo apt install git vim curl wget
For macOS:
# Install Homebrew if not already installed /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" # Install Python brew install python@3.11 # Install Git brew install git # Verify installation python3 --version pip3 --version
For Windows:
# Download Python from python.org # Select "Add Python to PATH" during installation # Open PowerShell or Command Prompt # Verify installation python --version pip --version # Install Git for Windows # Download from git-scm.com
Virtual environments isolate project dependencies and prevent conflicts between different automation projects.
# Create a directory for your automation project mkdir ~/network-automation cd ~/network-automation # Create virtual environment python3 -m venv venv # Activate virtual environment # On Linux/macOS: source venv/bin/activate # On Windows: .\venv\Scripts\activate # Your prompt should now show (venv) # Install wheel for better package installation pip install --upgrade pip wheel setuptools
With your virtual environment activated, install the core automation libraries:
# Core automation libraries paramiko==3.3.1 netmiko==4.3.0 napalm==4.1.0 nornir==3.4.1 nornir-netmiko==1.0.1 nornir-napalm==0.5.0 # Configuration templating jinja2==3.1.2 # NETCONF/YANG libraries ncclient==0.6.15 xmltodict==0.13.0 # Data manipulation pyyaml==6.0.1 pandas==2.1.3 openpyxl==3.1.2 # API interactions requests==2.31.0 # Network testing scapy==2.5.0 # Visualization matplotlib==3.8.2 # SNMP pysnmp==4.4.12 # CLI parsing textfsm==1.1.3 ttp==0.9.5 # Version control gitpython==3.1.40
Install all packages:
# Save the above content to requirements.txt
# Then install:
pip install -r requirements.txt
# Verify installations
python -c "import netmiko; print(f'Netmiko version: {netmiko.__version__}')"
python -c "import napalm; print('NAPALM installed successfully')"
python -c "import nornir; print(f'Nornir version: {nornir.__version__}')"
Visual Studio Code Setup (Recommended)
# Install VS Code from https://code.visualstudio.com/ # Essential Extensions to Install: # 1. Python (by Microsoft) # 2. Pylance (by Microsoft) # 3. GitLens # 4. YAML # 5. Jinja # 6. Better Comments # 7. Code Spell Checker # Install via command line: code --install-extension ms-python.python code --install-extension ms-python.vscode-pylance code --install-extension eamodio.gitlens code --install-extension redhat.vscode-yaml code --install-extension wholroyd.jinja
PyCharm Setup (Alternative)
Download PyCharm Community Edition from jetbrains.com/pycharm. It includes built-in support for Python, Git, and debugging.
Organize your automation project with this recommended structure:
network-automation/ │ ├── venv/ # Virtual environment (don't commit to git) ├── inventory/ # Device inventory files │ ├── hosts.yaml │ └── groups.yaml │ ├── configs/ # Generated configurations │ └── backups/ │ ├── templates/ # Jinja2 templates │ ├── bgp_config.j2 │ ├── interface_config.j2 │ └── ospf_config.j2 │ ├── scripts/ # Automation scripts │ ├── backup_configs.py │ ├── deploy_configs.py │ └── health_check.py │ ├── tests/ # Test files │ ├── test_connectivity.py │ └── test_configurations.py │ ├── logs/ # Log files │ ├── docs/ # Documentation │ ├── .gitignore # Git ignore file ├── requirements.txt # Python dependencies ├── README.md # Project documentation └── config.yaml # Global configuration
Create this structure:
#!/bin/bash # create_project_structure.sh mkdir -p inventory configs/backups templates scripts tests logs docs # Create .gitignore cat > .gitignore << 'EOF' venv/ __pycache__/ *.pyc *.pyo *.log configs/backups/* .env *.swp .DS_Store .vscode/ .idea/ EOF # Create README cat > README.md << 'EOF' # Network Automation Project ## Setup 1. Create virtual environment: `python3 -m venv venv` 2. Activate: `source venv/bin/activate` 3. Install dependencies: `pip install -r requirements.txt` ## Usage See docs/ for detailed instructions. EOF echo "Project structure created successfully!"
Part 2: Python Fundamentals for Network Engineers
Essential Python Concepts with Network Examples
Variables and Data Types
Understanding data types is crucial for handling network information effectively:
# String - for device names, IP addresses
device_name = "ROADM-01"
management_ip = "192.168.1.100"
# Integer - for port numbers, VLAN IDs
ssh_port = 22
vlan_id = 100
# Float - for optical power levels, OSNR values
optical_power = -3.5 # dBm
osnr_value = 22.8 # dB
# Boolean - for status checks
port_status = True
is_reachable = False
# List - for multiple similar items (ordered, mutable)
interfaces = ['GigabitEthernet0/1', 'GigabitEthernet0/2', 'GigabitEthernet0/3']
wavelengths = [1550.12, 1550.92, 1551.72, 1552.52] # nm
# Dictionary - for structured device information (key-value pairs)
device = {
'hostname': 'ROADM-01',
'ip': '192.168.1.100',
'vendor': 'Generic',
'model': 'CDC-ROADM',
'location': 'DC1-Floor2-Rack15'
}
# Tuple - for immutable data (ordered, cannot be changed)
coordinates = (37.7749, -122.4194) # latitude, longitude
# Print examples
print(f"Device: {device_name}")
print(f"Optical Power: {optical_power} dBm")
print(f"Device Info: {device}")
print(f"First interface: {interfaces[0]}")
# Access dictionary values
print(f"Device hostname: {device['hostname']}")
print(f"Device location: {device['location']}")
Control Flow: Loops and Conditionals
# If-elif-else statements for decision making
def check_optical_power(power_dbm):
"""Check if optical power level is within acceptable range"""
if power_dbm > 5:
return "WARNING: Power too high! Risk of receiver damage"
elif power_dbm >= -10:
return "OK: Power within normal range"
elif power_dbm >= -20:
return "CAUTION: Power low, monitor link"
else:
return "CRITICAL: Power too low, link may fail"
# Test the function
test_powers = [8, -3, -15, -25]
for power in test_powers:
status = check_optical_power(power)
print(f"Power: {power} dBm -> {status}")
print("\n" + "="*50 + "\n")
# For loops - iterate through devices
devices = [
{'name': 'ROADM-01', 'ip': '192.168.1.100'},
{'name': 'ROADM-02', 'ip': '192.168.1.101'},
{'name': 'ROADM-03', 'ip': '192.168.1.102'}
]
print("Checking device reachability:")
for device in devices:
# In real scenario, you'd use ping or socket check
print(f"Connecting to {device['name']} ({device['ip']})...")
print("\n" + "="*50 + "\n")
# While loops - useful for polling
def wait_for_interface_up(max_attempts=5):
"""Simulate waiting for interface to come up"""
attempt = 0
interface_status = "down"
while attempt < max_attempts and interface_status != "up":
attempt += 1
print(f"Attempt {attempt}: Checking interface status...")
# Simulate status check
if attempt >= 3: # Interface comes up on 3rd attempt
interface_status = "up"
return interface_status == "up"
# Test the function
success = wait_for_interface_up()
print(f"Interface is {'UP' if success else 'DOWN'}")
print("\n" + "="*50 + "\n")
# List comprehension - powerful one-liner for creating lists
# Filter wavelengths in C-band (1530-1565 nm)
all_wavelengths = [1520.5, 1535.8, 1545.3, 1555.7, 1570.2, 1580.4]
c_band_wavelengths = [wl for wl in all_wavelengths if 1530 <= wl <= 1565]
print(f"C-band wavelengths: {c_band_wavelengths}")
# Create list of interface names
interface_list = [f"TenGigE0/0/{i}" for i in range(1, 9)]
print(f"Interfaces: {interface_list}")
Functions for Reusable Code
import math
def calculate_osnr(ptx, gtotal, ltotal, namp, nfavg):
"""
Calculate Optical Signal-to-Noise Ratio
Formula: OSNR = PTX + Gtotal - Ltotal - 10×log(Namp) - NFavg + 58
Parameters:
ptx (float): Transmit power in dBm
gtotal (float): Total gain in dB
ltotal (float): Total loss in dB
namp (int): Number of amplifiers
nfavg (float): Average noise figure in dB
Returns:
float: OSNR in dB
"""
osnr = ptx + gtotal - ltotal - 10 * math.log10(namp) - nfavg + 58
return round(osnr, 2)
def db_to_mw(power_dbm):
"""Convert power from dBm to milliwatts"""
return 10 ** (power_dbm / 10)
def mw_to_db(power_mw):
"""Convert power from milliwatts to dBm"""
return 10 * math.log10(power_mw)
def parse_interface_name(interface_full):
"""
Parse interface name into components
Example: 'TenGigE0/0/0/15' -> ('TenGigE', '0/0/0/15')
"""
parts = interface_full.split('0', 1)
if len(parts) == 2:
return parts[0].strip(), '0' + parts[1]
return interface_full, ''
def calculate_link_budget(tx_power, rx_sensitivity, margin=3):
"""
Calculate link budget for optical link
Returns:
dict: Link budget components
"""
available_budget = tx_power - rx_sensitivity
usable_budget = available_budget - margin
return {
'tx_power_dbm': tx_power,
'rx_sensitivity_dbm': rx_sensitivity,
'margin_db': margin,
'available_budget_db': available_budget,
'usable_budget_db': usable_budget,
'status': 'OK' if usable_budget > 0 else 'INSUFFICIENT'
}
# Example usage
if __name__ == "__main__":
# Calculate OSNR
osnr = calculate_osnr(
ptx=0, # 0 dBm launch power
gtotal=80, # 80 dB total gain
ltotal=82, # 82 dB total loss
namp=4, # 4 amplifiers
nfavg=5.5 # 5.5 dB average NF
)
print(f"Calculated OSNR: {osnr} dB")
# Power conversions
power_dbm = 10
power_mw = db_to_mw(power_dbm)
print(f"{power_dbm} dBm = {power_mw:.2f} mW")
# Link budget calculation
budget = calculate_link_budget(
tx_power=5, # +5 dBm
rx_sensitivity=-28, # -28 dBm
margin=3 # 3 dB margin
)
print(f"\nLink Budget Analysis:")
for key, value in budget.items():
print(f" {key}: {value}")
# Interface parsing
interface = "TenGigE0/0/0/15"
int_type, int_number = parse_interface_name(interface)
print(f"\nInterface: {interface}")
print(f" Type: {int_type}")
print(f" Number: {int_number}")
Key Python Concepts to Master
Data Types: Strings, integers, floats, booleans, lists, dictionaries, tuples
Control Flow: if/elif/else, for loops, while loops, break/continue
Functions: Defining functions, parameters, return values, docstrings
Error Handling: try/except blocks for robust code
File I/O: Reading and writing files (configs, logs, data)
Modules & Imports: Organizing code and using libraries
Working with Files
import os
import json
import yaml
import csv
from datetime import datetime
def read_device_inventory(filename='inventory/devices.yaml'):
"""Read device inventory from YAML file"""
try:
with open(filename, 'r') as file:
inventory = yaml.safe_load(file)
return inventory
except FileNotFoundError:
print(f"Error: {filename} not found")
return None
except yaml.YAMLError as e:
print(f"Error parsing YAML: {e}")
return None
def write_json_report(data, filename):
"""Write data to JSON file with timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"logs/{filename}_{timestamp}.json"
try:
with open(output_file, 'w') as file:
json.dump(data, file, indent=2)
print(f"Report saved to: {output_file}")
return output_file
except IOError as e:
print(f"Error writing file: {e}")
return None
def append_to_log(message, logfile='logs/automation.log'):
"""Append message to log file with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
with open(logfile, 'a') as file:
file.write(log_entry)
def export_to_csv(data_list, filename, headers):
"""Export list of dictionaries to CSV"""
try:
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
writer.writerows(data_list)
print(f"Data exported to: {filename}")
except IOError as e:
print(f"Error writing CSV: {e}")
# Example usage
if __name__ == "__main__":
# Sample device inventory structure
sample_inventory = {
'optical_devices': [
{
'hostname': 'ROADM-01',
'ip': '192.168.1.100',
'location': 'DC1',
'type': 'roadm'
},
{
'hostname': 'OTN-SWITCH-01',
'ip': '192.168.1.101',
'location': 'DC1',
'type': 'otn_switch'
}
]
}
# Write sample inventory
os.makedirs('inventory', exist_ok=True)
with open('inventory/devices.yaml', 'w') as f:
yaml.dump(sample_inventory, f, default_flow_style=False)
# Read inventory
inventory = read_device_inventory()
if inventory:
print("Device Inventory:")
print(json.dumps(inventory, indent=2))
# Write JSON report
report_data = {
'report_type': 'health_check',
'timestamp': datetime.now().isoformat(),
'devices_checked': 2,
'status': 'completed'
}
write_json_report(report_data, 'health_check_report')
# Append to log
append_to_log("Automation script started")
append_to_log("Inventory loaded successfully")
# Export to CSV
device_status = [
{'device': 'ROADM-01', 'status': 'up', 'uptime_hours': 720},
{'device': 'OTN-SWITCH-01', 'status': 'up', 'uptime_hours': 168}
]
export_to_csv(device_status, 'logs/device_status.csv',
['device', 'status', 'uptime_hours'])
Error Handling
import sys
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/automation.log'),
logging.StreamHandler(sys.stdout)
]
)
def connect_to_device(device_ip, timeout=10):
"""
Connect to network device with proper error handling
"""
try:
logging.info(f"Attempting to connect to {device_ip}")
# Simulated connection (replace with actual netmiko connection)
if not device_ip:
raise ValueError("Device IP cannot be empty")
# Simulate connection logic
logging.info(f"Successfully connected to {device_ip}")
return True
except ValueError as e:
logging.error(f"Invalid input: {e}")
return False
except ConnectionError as e:
logging.error(f"Connection failed to {device_ip}: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error connecting to {device_ip}: {e}")
return False
def safe_divide(a, b):
"""Safely divide two numbers with error handling"""
try:
result = a / b
return result
except ZeroDivisionError:
logging.error("Cannot divide by zero")
return None
except TypeError:
logging.error("Invalid types for division")
return None
def validate_optical_power(power_dbm):
"""Validate optical power reading with specific exceptions"""
try:
# Convert to float if string
power = float(power_dbm)
# Check if value is reasonable
if power < -40:
raise ValueError(f"Power too low: {power} dBm (< -40 dBm)")
if power > 20:
raise ValueError(f"Power too high: {power} dBm (> +20 dBm)")
return power
except ValueError as e:
logging.warning(f"Invalid power value: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error validating power: {e}")
return None
# Example usage with multiple exception handling
def process_device_list(device_file):
"""Process device list with comprehensive error handling"""
try:
with open(device_file, 'r') as f:
devices = f.readlines()
results = []
for device in devices:
device = device.strip()
if device: # Skip empty lines
try:
success = connect_to_device(device)
results.append({'device': device, 'status': success})
except Exception as e:
logging.error(f"Error processing {device}: {e}")
results.append({'device': device, 'status': False})
return results
except FileNotFoundError:
logging.error(f"Device file not found: {device_file}")
return []
except IOError as e:
logging.error(f"Error reading file {device_file}: {e}")
return []
finally:
logging.info("Device processing completed")
if __name__ == "__main__":
# Test error handling
logging.info("Starting error handling examples")
# Test connection
connect_to_device("192.168.1.100")
connect_to_device("") # Will raise ValueError
# Test division
result = safe_divide(10, 2)
logging.info(f"10 / 2 = {result}")
result = safe_divide(10, 0) # Will handle ZeroDivisionError
# Test power validation
validate_optical_power(-5.5) # Valid
validate_optical_power(-50) # Too low
validate_optical_power(25) # Too high
validate_optical_power("abc") # Invalid type
Part 3: Mastering Netmiko for Device Automation
Netmiko is the most widely used Python library for CLI-based device automation. It provides a consistent interface across multiple vendor platforms.
Understanding Netmiko Basics
What is Netmiko?
Netmiko is a multi-vendor SSH library built on top of Paramiko. It simplifies network device interaction by handling:
Automatic Device Type Detection: Identifies vendor/platform automatically
Prompt Handling: Deals with different CLI prompts and pagination
Enable Mode: Automatically enters privileged mode when needed
Configuration Mode: Handles configuration commands properly
Basic Netmiko Connection
from netmiko import ConnectHandler
import getpass
# Device connection parameters
device = {
'device_type': 'cisco_ios', # Or 'juniper_junos', 'arista_eos', etc.
'host': '192.168.1.100',
'username': 'admin',
'password': getpass.getpass('Enter password: '), # Secure password input
'port': 22,
'secret': '', # Enable secret if needed
'verbose': False,
}
try:
# Establish connection
print(f"Connecting to {device['host']}...")
connection = ConnectHandler(**device)
# Send a command and get output
output = connection.send_command('show version')
print("\n=== Show Version Output ===")
print(output)
# Get hostname
hostname = connection.find_prompt()
print(f"\nDevice Hostname: {hostname}")
# Disconnect
connection.disconnect()
print("\nDisconnected successfully")
except Exception as e:
print(f"Error: {e}")
Executing Multiple Commands
from netmiko import ConnectHandler
import logging
# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
device = {
'device_type': 'cisco_ios',
'host': '192.168.1.100',
'username': 'admin',
'password': 'password',
}
# List of show commands to execute
show_commands = [
'show version',
'show interfaces status',
'show ip interface brief',
'show running-config',
'show inventory',
]
def collect_device_information(device_params, commands):
"""
Collect information from device using multiple show commands
Returns:
dict: Command outputs keyed by command
"""
results = {}
try:
# Connect to device
print(f"Connecting to {device_params['host']}...")
connection = ConnectHandler(**device_params)
# Execute each command
for cmd in commands:
print(f"Executing: {cmd}")
output = connection.send_command(cmd, read_timeout=30)
results[cmd] = output
# Disconnect
connection.disconnect()
print("Collection completed successfully")
except Exception as e:
print(f"Error during collection: {e}")
return None
return results
# Execute collection
if __name__ == "__main__":
outputs = collect_device_information(device, show_commands)
if outputs:
# Save outputs to files
import os
os.makedirs('collected_data', exist_ok=True)
for command, output in outputs.items():
# Create filename from command
filename = command.replace(' ', '_') + '.txt'
filepath = os.path.join('collected_data', filename)
with open(filepath, 'w') as f:
f.write(output)
print(f"Saved: {filepath}")
Configuration Management with Netmiko
from netmiko import ConnectHandler
from datetime import datetime
import os
def backup_configuration(device_params, backup_dir='configs/backups'):
"""
Backup device running configuration
Returns:
str: Path to backup file or None on error
"""
try:
# Create backup directory if it doesn't exist
os.makedirs(backup_dir, exist_ok=True)
# Connect to device
print(f"Connecting to {device_params['host']}...")
connection = ConnectHandler(**device_params)
# Get running config
print("Retrieving running configuration...")
config = connection.send_command('show running-config')
# Generate filename with timestamp
hostname = connection.find_prompt().strip('#>')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{hostname}_{timestamp}.cfg"
filepath = os.path.join(backup_dir, filename)
# Save configuration
with open(filepath, 'w') as f:
f.write(config)
# Disconnect
connection.disconnect()
print(f"Configuration backed up to: {filepath}")
return filepath
except Exception as e:
print(f"Error backing up configuration: {e}")
return None
def deploy_configuration(device_params, config_commands):
"""
Deploy configuration commands to device
Args:
device_params (dict): Device connection parameters
config_commands (list): List of configuration commands
Returns:
bool: True if successful, False otherwise
"""
try:
# Connect to device
print(f"Connecting to {device_params['host']}...")
connection = ConnectHandler(**device_params)
# Backup current config before making changes
print("Creating pre-change backup...")
backup_configuration(device_params)
# Enter configuration mode and send commands
print("Deploying configuration...")
output = connection.send_config_set(config_commands)
print(output)
# Save configuration
print("Saving configuration...")
save_output = connection.save_config()
print(save_output)
# Disconnect
connection.disconnect()
print("Configuration deployed successfully")
return True
except Exception as e:
print(f"Error deploying configuration: {e}")
return False
def configure_interface(device_params, interface, description, ip_address=None):
"""
Configure interface with description and optional IP address
"""
config_commands = [
f'interface {interface}',
f'description {description}',
]
if ip_address:
config_commands.extend([
f'ip address {ip_address["ip"]} {ip_address["mask"]}',
'no shutdown'
])
return deploy_configuration(device_params, config_commands)
# Example usage
if __name__ == "__main__":
device = {
'device_type': 'cisco_ios',
'host': '192.168.1.100',
'username': 'admin',
'password': 'password',
}
# Backup configuration
backup_configuration(device)
# Deploy sample configuration
sample_config = [
'interface Loopback100',
'description Management Loopback',
'ip address 10.255.255.1 255.255.255.255',
'no shutdown',
]
# Uncomment to actually deploy
# deploy_configuration(device, sample_config)
# Configure specific interface
# configure_interface(
# device,
# interface='GigabitEthernet0/1',
# description='Link to Core Switch',
# ip_address={'ip': '10.1.1.1', 'mask': '255.255.255.252'}
# )
- Always backup configurations before making changes
- Test commands in lab environment first
- Use connection timeouts to prevent hanging
- Implement try-except blocks for error handling
- Log all actions for audit trail
- Consider implementing change windows and approval workflows
Part 4: NAPALM for Vendor-Agnostic Automation
NAPALM (Network Automation and Programmability Abstraction Layer with Multivendor support) provides a unified API across different network vendors.
NAPALM Benefits Over Raw CLI
Why Use NAPALM?
Structured Data: Returns Python dictionaries instead of raw text
Vendor Agnostic: Same code works across Cisco, Juniper, Arista, etc.
Atomic Operations: Configuration changes with automatic rollback
Built-in Validation: Compare and verify configurations before applying
NAPALM Basic Operations
from napalm import get_network_driver
import json
# Device parameters
device_params = {
'hostname': '192.168.1.100',
'username': 'admin',
'password': 'password',
'optional_args': {'port': 22}
}
# Get the appropriate driver
driver = get_network_driver('ios') # or 'junos', 'eos', 'nxos'
device = driver(**device_params)
try:
# Open connection
print("Connecting to device...")
device.open()
# Get device facts
print("\n=== Device Facts ===")
facts = device.get_facts()
print(json.dumps(facts, indent=2))
# Get interface information
print("\n=== Interfaces ===")
interfaces = device.get_interfaces()
for interface, details in interfaces.items():
print(f"{interface}:")
print(f" Status: {'up' if details['is_up'] else 'down'}")
print(f" Speed: {details['speed']} Mbps")
print(f" MAC: {details['mac_address']}")
# Get interface IP addresses
print("\n=== Interface IP Addresses ===")
ips = device.get_interfaces_ip()
for interface, ip_data in ips.items():
if ip_data:
print(f"{interface}:")
for ip, details in ip_data.items():
print(f" {ip}: {details}")
# Get ARP table
print("\n=== ARP Table ===")
arp_table = device.get_arp_table()
for entry in arp_table[:5]: # Show first 5 entries
print(f"IP: {entry['ip']}, MAC: {entry['mac']}, Interface: {entry['interface']}")
# Get LLDP neighbors
print("\n=== LLDP Neighbors ===")
lldp = device.get_lldp_neighbors()
for local_port, neighbors in lldp.items():
for neighbor in neighbors:
print(f"{local_port} -> {neighbor['hostname']} ({neighbor['port']})")
finally:
# Always close connection
device.close()
print("\nConnection closed")
Configuration Management with NAPALM
from napalm import get_network_driver
from datetime import datetime
import difflib
def safe_config_deployment(device_params, config_file):
"""
Safely deploy configuration with automatic rollback capability
NAPALM provides atomic configuration deployment:
1. Load configuration into candidate config
2. Compare with running config
3. Review changes
4. Commit or discard
"""
driver = get_network_driver(device_params['driver'])
device = driver(
hostname=device_params['hostname'],
username=device_params['username'],
password=device_params['password']
)
try:
# Open connection
print(f"Connecting to {device_params['hostname']}...")
device.open()
# Load configuration (doesn't apply it yet)
print(f"Loading configuration from {config_file}...")
device.load_merge_candidate(filename=config_file)
# Get the diff between running and candidate config
print("\n=== Configuration Diff ===")
diff = device.compare_config()
if diff:
print(diff)
# Ask for confirmation
confirm = input("\nApply this configuration? (yes/no): ")
if confirm.lower() == 'yes':
print("Committing configuration...")
device.commit_config()
print("Configuration applied successfully")
else:
print("Discarding changes...")
device.discard_config()
print("Changes discarded")
else:
print("No configuration changes detected")
device.discard_config()
except Exception as e:
print(f"Error during configuration deployment: {e}")
print("Discarding changes and rolling back...")
device.discard_config()
finally:
device.close()
def rollback_configuration(device_params, rollback_steps=1):
"""
Rollback to previous configuration
Note: Rollback availability depends on platform
"""
driver = get_network_driver(device_params['driver'])
device = driver(
hostname=device_params['hostname'],
username=device_params['username'],
password=device_params['password']
)
try:
device.open()
print(f"Rolling back {rollback_steps} configuration(s)...")
device.rollback()
print("Rollback completed")
except Exception as e:
print(f"Error during rollback: {e}")
finally:
device.close()
def validate_configuration_syntax(config_file):
"""
Validate configuration file syntax before deployment
"""
try:
with open(config_file, 'r') as f:
config = f.read()
# Basic validation checks
errors = []
# Check for common syntax errors
if 'interface' in config and 'no shutdown' not in config:
errors.append("Warning: No 'no shutdown' command found")
if errors:
print("Validation warnings:")
for error in errors:
print(f" - {error}")
return False
print("Configuration syntax validation passed")
return True
except Exception as e:
print(f"Error validating configuration: {e}")
return False
# Example usage
if __name__ == "__main__":
device_params = {
'driver': 'ios',
'hostname': '192.168.1.100',
'username': 'admin',
'password': 'password'
}
# Create sample configuration file
sample_config = """
interface Loopback200
description NAPALM Test Loopback
ip address 10.200.200.1 255.255.255.255
no shutdown
"""
with open('sample_config.txt', 'w') as f:
f.write(sample_config)
# Validate and deploy
if validate_configuration_syntax('sample_config.txt'):
# Uncomment to actually deploy
# safe_config_deployment(device_params, 'sample_config.txt')
pass
Part 5: Configuration Templating with Jinja2
Jinja2 allows you to create dynamic configuration templates that adapt to different devices and scenarios, eliminating repetitive configuration work.
Jinja2 Template Basics
Simple Template Example
interface {{ interface_name }}
description {{ description }}
{% if ip_address %}
ip address {{ ip_address }} {{ subnet_mask }}
{% endif %}
{% if vlan_id %}
switchport mode access
switchport access vlan {{ vlan_id }}
{% endif %}
no shutdown
!
from jinja2 import Environment, FileSystemLoader
import os
# Set up Jinja2 environment
template_dir = 'templates'
env = Environment(loader=FileSystemLoader(template_dir))
# Load template
template = env.get_template('interface_config.j2')
# Data for rendering
interface_data = {
'interface_name': 'GigabitEthernet0/1',
'description': 'Link to Core Router',
'ip_address': '10.1.1.1',
'subnet_mask': '255.255.255.252',
}
# Render template
config = template.render(interface_data)
print(config)
Advanced Jinja2 Features
!
! Configuration for {{ hostname }}
! Generated: {{ generation_time }}
! Location: {{ location }}
!
hostname {{ hostname }}
!
{% if dns_servers %}
{% for dns in dns_servers %}
ip name-server {{ dns }}
{% endfor %}
{% endif %}
!
{% if ntp_servers %}
{% for ntp in ntp_servers %}
ntp server {{ ntp }}
{% endfor %}
{% endif %}
!
! Optical Interfaces Configuration
{% for port in optical_ports %}
interface {{ port.name }}
description {{ port.description }}
{% if port.wavelength %}
wavelength {{ port.wavelength }} nm
{% endif %}
{% if port.tx_power %}
optical-power transmit {{ port.tx_power }} dBm
{% endif %}
{% if port.fec_mode %}
fec-mode {{ port.fec_mode }}
{% endif %}
no shutdown
!
{% endfor %}
!
{% if bgp_config %}
router bgp {{ bgp_config.asn }}
bgp log-neighbor-changes
{% for neighbor in bgp_config.neighbors %}
neighbor {{ neighbor.ip }} remote-as {{ neighbor.remote_asn }}
neighbor {{ neighbor.ip }} description {{ neighbor.description }}
{% endfor %}
!
address-family ipv4
{% for neighbor in bgp_config.neighbors %}
neighbor {{ neighbor.ip }} activate
{% endfor %}
exit-address-family
{% endif %}
!
end
from jinja2 import Environment, FileSystemLoader
from datetime import datetime
import yaml
def generate_device_config(device_data, template_name):
"""
Generate device configuration from template and data
Args:
device_data (dict): Device-specific data
template_name (str): Name of Jinja2 template file
Returns:
str: Rendered configuration
"""
# Set up Jinja2 environment
env = Environment(
loader=FileSystemLoader('templates'),
trim_blocks=True,
lstrip_blocks=True
)
# Add generation timestamp
device_data['generation_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Load and render template
template = env.get_template(template_name)
config = template.render(device_data)
return config
# Example device data
device_data = {
'hostname': 'ROADM-DC1-01',
'location': 'DataCenter-1 Floor-2 Rack-15',
'dns_servers': ['8.8.8.8', '8.8.4.4'],
'ntp_servers': ['0.pool.ntp.org', '1.pool.ntp.org'],
'optical_ports': [
{
'name': 'OpticalEthernet0/0/0/1',
'description': 'C-band wavelength to ROADM-DC2-01',
'wavelength': 1550.12,
'tx_power': 0,
'fec_mode': 'enhanced'
},
{
'name': 'OpticalEthernet0/0/0/2',
'description': 'C-band wavelength to ROADM-DC3-01',
'wavelength': 1550.92,
'tx_power': 0,
'fec_mode': 'enhanced'
}
],
'bgp_config': {
'asn': 65001,
'neighbors': [
{
'ip': '10.1.1.2',
'remote_asn': 65002,
'description': 'BGP peer to DC2'
},
{
'ip': '10.1.1.6',
'remote_asn': 65003,
'description': 'BGP peer to DC3'
}
]
}
}
# Generate configuration
if __name__ == "__main__":
config = generate_device_config(device_data, 'optical_device_config.j2')
print(config)
# Save to file
output_file = f"configs/{device_data['hostname']}_config.txt"
with open(output_file, 'w') as f:
f.write(config)
print(f"\nConfiguration saved to: {output_file}")
Jinja2 Filters and Tests
!
! Using Jinja2 Filters
!
hostname {{ hostname | upper }}
!
{% for interface in interfaces %}
! Interface {{ loop.index }} of {{ loop.length }}
interface {{ interface.name }}
description {{ interface.description | default('No description') }}
{% if interface.ip_address is defined %}
ip address {{ interface.ip_address }} {{ interface.mask }}
{% endif %}
{% if interface.bandwidth %}
bandwidth {{ interface.bandwidth | int }}
{% endif %}
!
{% endfor %}
!
! Conditional Logic
{% if optical_power < -20 %}
! WARNING: Optical power critically low!
{% elif optical_power < -10 %}
! CAUTION: Optical power below optimal range
{% else %}
! Optical power within acceptable range
{% endif %}
!
! Loop with Conditionals
{% for wavelength in wavelengths %}
{% if wavelength >= 1530 and wavelength <= 1565 %}
! Wavelength {{ wavelength }} nm is in C-band
{% endif %}
{% endfor %}
!
! Using Math Filters
! Total interfaces: {{ interfaces | length }}
! Average bandwidth: {{ (interfaces | sum(attribute='bandwidth')) / (interfaces | length) }}
!
from jinja2 import Environment, FileSystemLoader
# Custom filter function
def dbm_to_mw(power_dbm):
"""Convert dBm to milliwatts"""
return round(10 ** (power_dbm / 10), 2)
# Set up environment with custom filter
env = Environment(loader=FileSystemLoader('templates'))
env.filters['dbm_to_mw'] = dbm_to_mw
# Template with custom filter
template_str = """
Optical Power Readings:
{% for reading in power_readings %}
Port {{ reading.port }}: {{ reading.power_dbm }} dBm ({{ reading.power_dbm | dbm_to_mw }} mW)
{% endfor %}
"""
template = env.from_string(template_str)
# Data
data = {
'power_readings': [
{'port': 1, 'power_dbm': 0},
{'port': 2, 'power_dbm': -3},
{'port': 3, 'power_dbm': -10},
]
}
# Render
output = template.render(data)
print(output)
Part 7: Model-Driven Automation with NETCONF/YANG
NETCONF and YANG represent the future of network automation, providing structured, standardized interfaces for multi-vendor environments.
Understanding NETCONF and YANG
What are NETCONF and YANG?
YANG: Data modeling language that defines the structure of configuration and operational data
NETCONF: Network configuration protocol that provides operations to retrieve and edit configuration using YANG models
Benefits: Structured data (XML/JSON), transactional operations, candidate configuration, automatic rollback
NETCONF Basic Operations
from ncclient import manager
import xml.dom.minidom as minidom
import logging
# Enable logging for debugging
logging.basicConfig(level=logging.INFO)
def connect_netconf(host, username, password, port=830):
"""
Establish NETCONF connection
Returns:
manager: NETCONF manager object
"""
return manager.connect(
host=host,
port=port,
username=username,
password=password,
hostkey_verify=False,
device_params={'name': 'default'},
timeout=30
)
def get_capabilities(connection):
"""Get NETCONF capabilities supported by device"""
print("=== Device Capabilities ===")
for capability in connection.server_capabilities:
print(capability)
def get_running_config(connection, filter=None):
"""
Retrieve running configuration
Args:
connection: NETCONF manager object
filter: Optional XML filter to retrieve specific data
"""
if filter:
config = connection.get_config(source='running', filter=filter)
else:
config = connection.get_config(source='running')
# Pretty print XML
xml_str = minidom.parseString(str(config)).toprettyxml(indent=" ")
return xml_str
def get_interface_statistics(connection):
"""
Get interface statistics using NETCONF
Example for OpenConfig model
"""
filter_xml = """
"""
try:
result = connection.get(filter=filter_xml)
return result
except Exception as e:
print(f"Error retrieving interface statistics: {e}")
return None
# Example usage
if __name__ == "__main__":
device = {
'host': '192.168.1.100',
'username': 'admin',
'password': 'password',
'port': 830
}
try:
print("Connecting via NETCONF...")
conn = connect_netconf(**device)
# Get capabilities
get_capabilities(conn)
# Get running config (be careful, can be large!)
# config = get_running_config(conn)
# print(config)
print("\nNETCONF session established successfully")
except Exception as e:
print(f"Error: {e}")
finally:
if 'conn' in locals():
conn.close_session()
print("NETCONF session closed")
NETCONF Configuration Management
from ncclient import manager
from ncclient.operations import RPCError
import xml.etree.ElementTree as ET
def configure_interface_netconf(connection, interface_name, ip_address, mask):
"""
Configure interface using NETCONF
This example uses OpenConfig YANG models
"""
config_xml = f"""
{interface_name}
{interface_name}
ianaift:ethernetCsmacd
true
0
{ip_address}
{ip_address}
{mask}
"""
try:
# Load configuration into candidate
print("Loading configuration...")
conn.edit_config(target='candidate', config=config_xml)
# Validate configuration
print("Validating configuration...")
conn.validate(source='candidate')
# Commit configuration
print("Committing configuration...")
conn.commit()
print("Configuration applied successfully")
return True
except RPCError as e:
print(f"RPC Error: {e}")
# Discard changes on error
conn.discard_changes()
return False
except Exception as e:
print(f"Error: {e}")
return False
def configure_optical_channel(connection, channel_config):
"""
Configure optical channel parameters
Using OpenConfig terminal-device model
"""
config_xml = f"""
{channel_config['index']}
{channel_config['index']}
{channel_config['description']}
ENABLED
{channel_config['frequency']}
{channel_config['power']}
{channel_config['mode']}
"""
try:
conn.edit_config(target='candidate', config=config_xml)
conn.commit()
print(f"Optical channel {channel_config['index']} configured successfully")
return True
except Exception as e:
print(f"Error configuring optical channel: {e}")
conn.discard_changes()
return False
def get_optical_power_monitoring(connection):
"""
Retrieve optical power levels using NETCONF
"""
filter_xml = """
"""
try:
result = connection.get(filter=filter_xml)
return result.data_xml
except Exception as e:
print(f"Error retrieving optical power: {e}")
return None
# Example usage
if __name__ == "__main__":
device = {
'host': '192.168.1.100',
'username': 'admin',
'password': 'password',
'port': 830
}
try:
conn = manager.connect(
host=device['host'],
port=device['port'],
username=device['username'],
password=device['password'],
hostkey_verify=False,
device_params={'name': 'default'}
)
# Configure optical channel
channel_config = {
'index': 1,
'description': 'DWDM Channel to Remote Site',
'frequency': 193100000, # 193.1 THz (1550.12 nm)
'power': 0, # 0 dBm target output power
'mode': '100' # 100G operational mode
}
# Uncomment to actually configure
# configure_optical_channel(conn, channel_config)
# Get optical power monitoring
# power_data = get_optical_power_monitoring(conn)
# print(power_data)
except Exception as e:
print(f"Error: {e}")
finally:
if 'conn' in locals():
conn.close_session()
Part 9: Monitoring and Streaming Telemetry
SNMP-Based Monitoring
from pysnmp.hlapi import *
from datetime import datetime
import time
def snmp_get(device_ip, community, oid):
"""
Perform SNMP GET operation
Args:
device_ip (str): IP address of device
community (str): SNMP community string
oid (str): OID to query
Returns:
Value retrieved from device
"""
iterator = getCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((device_ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
print(f"Error: {errorIndication}")
return None
elif errorStatus:
print(f"Error: {errorStatus.prettyPrint()}")
return None
else:
for varBind in varBinds:
return varBind[1]
def monitor_optical_power_snmp(device_ip, community, port_index):
"""
Monitor optical power levels via SNMP
Common OIDs for optical monitoring:
- Optical TX power: .1.3.6.1.4.1.9.9.719.1.7.1.1.3 (Cisco)
- Optical RX power: .1.3.6.1.4.1.9.9.719.1.7.1.1.4 (Cisco)
"""
# Example OID (vendor-specific)
tx_power_oid = f'.1.3.6.1.4.1.9.9.719.1.7.1.1.3.{port_index}'
rx_power_oid = f'.1.3.6.1.4.1.9.9.719.1.7.1.1.4.{port_index}'
tx_power = snmp_get(device_ip, community, tx_power_oid)
rx_power = snmp_get(device_ip, community, rx_power_oid)
if tx_power and rx_power:
# Convert to dBm (values usually in tenths of dBm)
tx_power_dbm = int(tx_power) / 10.0
rx_power_dbm = int(rx_power) / 10.0
return {
'timestamp': datetime.now().isoformat(),
'port_index': port_index,
'tx_power_dbm': tx_power_dbm,
'rx_power_dbm': rx_power_dbm
}
return None
def continuous_monitoring(device_ip, community, port_indices, interval=60):
"""
Continuously monitor optical power levels
Args:
device_ip (str): Device IP address
community (str): SNMP community
port_indices (list): List of port indices to monitor
interval (int): Polling interval in seconds
"""
print(f"Starting continuous monitoring (interval: {interval}s)")
print("Press Ctrl+C to stop\n")
try:
while True:
print(f"=== Polling at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
for port_index in port_indices:
data = monitor_optical_power_snmp(device_ip, community, port_index)
if data:
print(f"Port {port_index}:")
print(f" TX Power: {data['tx_power_dbm']:.2f} dBm")
print(f" RX Power: {data['rx_power_dbm']:.2f} dBm")
# Check thresholds
if data['rx_power_dbm'] < -20:
print(f" WARNING: RX power below threshold!")
# Log to file
with open('logs/optical_power_monitor.log', 'a') as f:
f.write(f"{data['timestamp']},{port_index},"
f"{data['tx_power_dbm']},{data['rx_power_dbm']}\n")
print()
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped")
def get_interface_status(device_ip, community):
"""
Get interface operational status via SNMP
ifOperStatus OID: 1.3.6.1.2.1.2.2.1.8
Values: 1=up, 2=down, 3=testing, 4=unknown, 5=dormant, 6=notPresent, 7=lowerLayerDown
"""
# Walk the interface table
interfaces = {}
for (errorIndication,
errorStatus,
errorIndex,
varBinds) in nextCmd(SnmpEngine(),
CommunityData(community),
UdpTransportTarget((device_ip, 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2.1.8')),
lexicographicMode=False):
if errorIndication:
print(f"Error: {errorIndication}")
break
elif errorStatus:
print(f"Error: {errorStatus.prettyPrint()}")
break
else:
for varBind in varBinds:
oid_str = str(varBind[0])
index = oid_str.split('.')[-1]
status_code = int(varBind[1])
status_map = {
1: 'up',
2: 'down',
3: 'testing',
4: 'unknown',
5: 'dormant',
6: 'notPresent',
7: 'lowerLayerDown'
}
interfaces[index] = status_map.get(status_code, 'unknown')
return interfaces
# Example usage
if __name__ == "__main__":
device = '192.168.1.100'
community = 'public'
# Get interface status
print("=== Interface Status ===")
interfaces = get_interface_status(device, community)
for index, status in interfaces.items():
print(f"Interface {index}: {status}")
# Monitor optical power for specific ports
# Uncomment to run continuous monitoring
# continuous_monitoring(device, community, port_indices=[1, 2, 3], interval=60)
Streaming Telemetry with gNMI
Why Streaming Telemetry?
Real-Time Data: Continuous streaming vs periodic polling
Higher Frequency: Sub-second updates for critical metrics
Lower Overhead: More efficient than SNMP polling
Structured Data: JSON-encoded, schema-defined data
"""
gNMI Streaming Telemetry Example
Note: This requires additional libraries:
pip install grpcio grpcio-tools protobuf
"""
# Conceptual example - actual implementation requires gNMI proto files
def gnmi_subscribe_optical_power(device_ip, port, path):
"""
Subscribe to optical power telemetry stream
Args:
device_ip: Device IP address
port: gNMI port (typically 57400)
path: YANG path to subscribe to
"""
# This is a simplified conceptual example
# Full implementation requires gNMI stub and proto compilation
subscription_path = {
'prefix': {
'origin': 'openconfig',
'target': device_ip
},
'subscription': [
{
'path': {
'elem': [
{'name': 'interfaces'},
{'name': 'interface', 'key': {'name': 'Ethernet1/1'}},
{'name': 'state'},
{'name': 'optical-channel'},
{'name': 'output-power'}
]
},
'mode': 'ON_CHANGE', # or 'SAMPLE' with sample_interval
'sample_interval': 10000000000 # 10 seconds in nanoseconds
}
],
'mode': 'STREAM',
'encoding': 'JSON_IETF'
}
# Connect and subscribe (pseudo-code)
# for update in subscribe_stream(subscription_path):
# process_telemetry_update(update)
print("gNMI telemetry subscription configured")
print(f"Path: {path}")
print(f"Mode: ON_CHANGE")
# For actual implementation, use libraries like:
# - cisco-gnmi (Cisco devices)
# - pygnmi (generic gNMI client)
print("gNMI telemetry requires additional setup and proto compilation")
print("Refer to vendor documentation for specific implementation")
Part 12: Production Deployment Best Practices
Security Best Practices
import os
from getpass import getpass
from cryptography.fernet import Fernet
import keyring
import json
class SecureCredentialManager:
"""
Secure credential management for automation scripts
"""
def __init__(self, service_name='network_automation'):
self.service_name = service_name
def store_credential(self, username, password):
"""
Store credential securely using system keyring
"""
keyring.set_password(self.service_name, username, password)
print(f"Credential stored securely for {username}")
def get_credential(self, username):
"""
Retrieve credential from system keyring
"""
password = keyring.get_password(self.service_name, username)
if password:
return {'username': username, 'password': password}
return None
def delete_credential(self, username):
"""
Delete stored credential
"""
try:
keyring.delete_password(self.service_name, username)
print(f"Credential deleted for {username}")
except keyring.errors.PasswordDeleteError:
print(f"No credential found for {username}")
def use_environment_variables():
"""
Use environment variables for credentials
Recommended for CI/CD pipelines
"""
credentials = {
'username': os.getenv('NETWORK_USERNAME'),
'password': os.getenv('NETWORK_PASSWORD'),
'enable_secret': os.getenv('NETWORK_ENABLE_SECRET')
}
# Validate that all required variables are set
missing = [k for k, v in credentials.items() if v is None]
if missing:
raise EnvironmentError(f"Missing environment variables: {', '.join(missing)}")
return credentials
def encrypt_config_file(config_data, key_file='encryption.key'):
"""
Encrypt sensitive configuration data
"""
# Generate or load encryption key
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
key = f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
os.chmod(key_file, 0o600) # Restrict permissions
# Encrypt data
fernet = Fernet(key)
encrypted_data = fernet.encrypt(json.dumps(config_data).encode())
return encrypted_data
def decrypt_config_file(encrypted_data, key_file='encryption.key'):
"""
Decrypt sensitive configuration data
"""
with open(key_file, 'rb') as f:
key = f.read()
fernet = Fernet(key)
decrypted_data = fernet.decrypt(encrypted_data)
return json.loads(decrypted_data.decode())
# Example usage
if __name__ == "__main__":
# Method 1: System keyring
cred_manager = SecureCredentialManager()
# Store credential (do this once)
# username = input("Username: ")
# password = getpass("Password: ")
# cred_manager.store_credential(username, password)
# Retrieve credential
# creds = cred_manager.get_credential('admin')
# print(f"Retrieved credentials for: {creds['username']}")
# Method 2: Environment variables (recommended for production)
# Set environment variables first:
# export NETWORK_USERNAME='admin'
# export NETWORK_PASSWORD='secure_password'
# creds = use_environment_variables()
# Method 3: Encrypted configuration file
config = {
'devices': {
'router1': {'username': 'admin', 'password': 'password123'},
'router2': {'username': 'admin', 'password': 'password456'}
}
}
# Encrypt
encrypted = encrypt_config_file(config)
with open('config.encrypted', 'wb') as f:
f.write(encrypted)
# Decrypt
with open('config.encrypted', 'rb') as f:
encrypted_data = f.read()
decrypted = decrypt_config_file(encrypted_data)
print("Decrypted config:", decrypted)
Logging and Audit Trail
import logging
import logging.handlers
from datetime import datetime
import json
import sys
class AutomationLogger:
"""
Comprehensive logging for network automation
"""
def __init__(self, log_name='network_automation'):
self.logger = logging.getLogger(log_name)
self.logger.setLevel(logging.DEBUG)
# Create formatters
detailed_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
simple_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# Console handler (INFO and above)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(simple_formatter)
self.logger.addHandler(console_handler)
# File handler (DEBUG and above)
file_handler = logging.handlers.RotatingFileHandler(
'logs/automation_detailed.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(detailed_formatter)
self.logger.addHandler(file_handler)
# Audit log handler (INFO and above)
audit_handler = logging.FileHandler('logs/audit_trail.log')
audit_handler.setLevel(logging.INFO)
audit_handler.setFormatter(detailed_formatter)
self.logger.addHandler(audit_handler)
def log_device_action(self, device, action, status, details=None):
"""
Log device-specific action with structured data
"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'device': device,
'action': action,
'status': status,
'details': details or {}
}
if status == 'success':
self.logger.info(f"Device Action: {json.dumps(log_entry)}")
elif status == 'failed':
self.logger.error(f"Device Action Failed: {json.dumps(log_entry)}")
else:
self.logger.warning(f"Device Action Warning: {json.dumps(log_entry)}")
def log_config_change(self, device, change_type, config_before, config_after):
"""
Log configuration changes with before/after states
"""
change_log = {
'timestamp': datetime.now().isoformat(),
'device': device,
'change_type': change_type,
'config_before': config_before,
'config_after': config_after
}
# Save to dedicated config change log
with open('logs/config_changes.jsonl', 'a') as f:
f.write(json.dumps(change_log) + '\n')
self.logger.info(f"Configuration changed on {device}: {change_type}")
# Example usage
if __name__ == "__main__":
logger = AutomationLogger()
# Log device actions
logger.log_device_action(
device='192.168.1.100',
action='backup_configuration',
status='success',
details={'backup_file': '/backups/router1_20250110.cfg'}
)
logger.log_device_action(
device='192.168.1.101',
action='deploy_configuration',
status='failed',
details={'error': 'Connection timeout after 30 seconds'}
)
# Log configuration changes
logger.log_config_change(
device='192.168.1.100',
change_type='interface_configuration',
config_before='interface GigabitEthernet0/1\n shutdown',
config_after='interface GigabitEthernet0/1\n no shutdown\n description Uplink'
)
Error Handling and Rollback
import traceback
from datetime import datetime
import sys
class AutomationError(Exception):
"""Base exception for automation errors"""
pass
class DeviceConnectionError(AutomationError):
"""Device connection failed"""
pass
class ConfigurationError(AutomationError):
"""Configuration deployment failed"""
pass
class ValidationError(AutomationError):
"""Pre/post validation failed"""
pass
def safe_automation_wrapper(func):
"""
Decorator for safe automation execution with rollback
"""
def wrapper(*args, **kwargs):
start_time = datetime.now()
try:
print(f"Starting: {func.__name__}")
result = func(*args, **kwargs)
duration = (datetime.now() - start_time).total_seconds()
print(f"Completed successfully in {duration:.2f} seconds")
return result
except DeviceConnectionError as e:
print(f"Connection Error: {e}")
print("Action: Check device reachability and credentials")
return None
except ConfigurationError as e:
print(f"Configuration Error: {e}")
print("Action: Rolling back changes...")
# Implement rollback logic here
return None
except ValidationError as e:
print(f"Validation Error: {e}")
print("Action: Discarding changes")
return None
except Exception as e:
print(f"Unexpected Error: {e}")
print("Stack trace:")
traceback.print_exc()
return None
return wrapper
@safe_automation_wrapper
def deploy_configuration_safely(device, config):
"""
Deploy configuration with comprehensive error handling
"""
# Pre-deployment validation
print("Step 1: Pre-deployment validation")
if not validate_configuration(config):
raise ValidationError("Configuration validation failed")
# Backup current configuration
print("Step 2: Backing up current configuration")
backup = backup_configuration(device)
if not backup:
raise ConfigurationError("Backup failed, aborting deployment")
# Deploy configuration
print("Step 3: Deploying configuration")
success = deploy_config(device, config)
if not success:
print("Deployment failed, restoring backup...")
restore_configuration(device, backup)
raise ConfigurationError("Configuration deployment failed")
# Post-deployment validation
print("Step 4: Post-deployment validation")
if not post_deployment_validation(device):
print("Post-deployment validation failed, rolling back...")
restore_configuration(device, backup)
raise ValidationError("Post-deployment validation failed")
print("Configuration deployed successfully")
return True
def validate_configuration(config):
"""Validate configuration before deployment"""
# Implement validation logic
return True
def backup_configuration(device):
"""Backup current configuration"""
# Implement backup logic
return "backup_data"
def deploy_config(device, config):
"""Deploy configuration to device"""
# Implement deployment logic
return True
def restore_configuration(device, backup):
"""Restore configuration from backup"""
# Implement restore logic
print(f"Configuration restored from backup")
return True
def post_deployment_validation(device):
"""Validate device after configuration"""
# Implement validation logic
return True
# Example usage
if __name__ == "__main__":
device = {'host': '192.168.1.100', 'username': 'admin', 'password': 'password'}
config = ['interface GigabitEthernet0/1', 'no shutdown']
result = deploy_configuration_safely(device, config)
All the Best with your Journey Ahead!!!
This comprehensive technical guide has covered the essential practical skills for optical network automation, from basic Python programming to advanced NETCONF/YANG implementations. The key to success is consistent practice and gradual skill building.
Additional Resources
- Documentation: Official docs for Netmiko, NAPALM, Nornir, ncclient
- GitHub: Study open-source network automation projects
- Community: Join Network to Code Slack, Reddit r/networking
- Practice: Use GNS3/EVE-NG for lab environments
- Certifications: Cisco DevNet, Red Hat Ansible, vendor-specific certs
- MapYourTech Book: Automation for Network Engineers Using Python and Jinja2 - Comprehensive guide covering Python, Jinja2, and network automation fundamentals
Important: All code examples in this guide are provided for educational purposes. Always test automation scripts in a lab environment before deploying to production networks. Follow your organization's change management procedures and security policies.
Developed by MapYourTech Team
Complete Technical Implementation Guide for Optical Network Automation
Unlock Premium Content
Join over 400K+ optical network professionals worldwide. Access premium courses, advanced engineering tools, and exclusive industry insights.
Already have an account? Log in here