Multi-Cloud Strategy: Avoiding Vendor Lock-in with Kubernetes
Jules Musoko
Principal Consultant
Multi-Cloud Strategy: Avoiding Vendor Lock-in with Kubernetes
Vendor lock-in is one of the biggest concerns I hear from enterprise clients when planning their cloud strategy. After helping over 50+ organizations implement multi-cloud architectures, I've learned that the key isn't avoiding cloud-specific services entirely – it's about making strategic choices that preserve your flexibility while leveraging each cloud's strengths.
This comprehensive guide shows you how to build a robust multi-cloud strategy using Kubernetes as your abstraction layer, enabling you to deploy across AWS, Azure, and Google Cloud Platform while maintaining operational consistency and avoiding catastrophic vendor lock-in.
The Multi-Cloud Reality Check
Before diving into implementation, let's address the elephant in the room: pure cloud-agnostic architecture is often suboptimal. The most value in cloud computing comes from leveraging cloud-native services, not just running generic workloads on virtual machines.
The goal isn't to build the same thing everywhere – it's to architect for strategic flexibility while optimizing for each cloud's strengths.
Multi-Cloud Business Drivers
Organizations pursue multi-cloud strategies for several reasons:
- Risk Mitigation: Avoiding single points of failure and vendor dependencies - Regulatory Compliance: Data residency and sovereignty requirements - Cost Optimization: Leveraging competitive pricing and avoiding egress costs - Performance: Using edge locations and regional availability - Best-of-Breed Services: Choosing optimal services from each provider - Negotiating Power: Maintaining leverage in vendor relationships
Kubernetes as the Multi-Cloud Foundation
Kubernetes provides the perfect abstraction layer for multi-cloud deployments. Here's our proven architecture pattern:
multi-cloud-architecture.yaml - Kubernetes abstraction layer
apiVersion: v1
kind: ConfigMap
metadata:
name: cloud-config
namespace: kube-system
data:
primary-cloud: "aws"
regions: |
aws:
- us-east-1
- eu-west-1
- ap-southeast-1
azure:
- eastus
- westeurope
- southeastasia
gcp:
- us-central1
- europe-west1
- asia-southeast1
deployment-strategy: "primary-secondary"
failover-enabled: "true"---
Cloud-agnostic storage class
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: fast-ssd
annotations:
storageclass.kubernetes.io/is-default-class: "false"
provisioner: ${CLOUD_PROVISIONER} # AWS: ebs.csi.aws.com, Azure: disk.csi.azure.com, GCP: pd.csi.storage.gke.io
parameters:
type: ${DISK_TYPE} # AWS: gp3, Azure: Premium_LRS, GCP: pd-ssd
fsType: ext4
reclaimPolicy: Delete
allowVolumeExpansion: true
volumeBindingMode: WaitForFirstConsumer---
Multi-cloud ingress with provider abstraction
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: app-ingress
annotations:
# Cloud-agnostic annotations
kubernetes.io/ingress.class: "nginx"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
# Cloud-specific optimization (applied conditionally)
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
spec:
tls:
- hosts:
- api.company.com
secretName: api-tls
rules:
- host: api.company.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: api-service
port:
number: 80
Cloud Provider Abstraction Layer
Create a robust abstraction layer that handles cloud-specific differences:
#!/usr/bin/env python3
cloud-abstraction.py - Multi-cloud provider abstraction
import os
import json
import boto3
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from google.cloud import compute_v1
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod
@dataclass
class CloudResource:
"""Generic cloud resource representation"""
id: str
name: str
type: str
region: str
status: str
metadata: Dict[str, any]
@dataclass
class ComputeInstance:
"""Generic compute instance representation"""
id: str
name: str
instance_type: str
region: str
status: str
public_ip: Optional[str]
private_ip: str
tags: Dict[str, str]
class CloudProvider(ABC):
"""Abstract base class for cloud providers"""
@abstractmethod
def list_instances(self, region: str = None) -> List[ComputeInstance]:
"""List compute instances"""
pass
@abstractmethod
def create_instance(self, config: Dict) -> ComputeInstance:
"""Create a new compute instance"""
pass
@abstractmethod
def get_available_regions(self) -> List[str]:
"""Get list of available regions"""
pass
@abstractmethod
def get_kubernetes_config(self, cluster_name: str, region: str) -> Dict:
"""Get Kubernetes cluster configuration"""
pass
class AWSProvider(CloudProvider):
"""AWS implementation of cloud provider abstraction"""
def __init__(self, profile: str = None):
self.session = boto3.Session(profile_name=profile)
self.ec2 = self.session.client('ec2')
self.eks = self.session.client('eks')
def list_instances(self, region: str = None) -> List[ComputeInstance]:
"""List EC2 instances"""
if region:
ec2_client = self.session.client('ec2', region_name=region)
else:
ec2_client = self.ec2
response = ec2_client.describe_instances()
instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
# Extract tags
tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
instances.append(ComputeInstance(
id=instance['InstanceId'],
name=tags.get('Name', instance['InstanceId']),
instance_type=instance['InstanceType'],
region=instance['Placement']['AvailabilityZone'][:-1],
status=instance['State']['Name'],
public_ip=instance.get('PublicIpAddress'),
private_ip=instance.get('PrivateIpAddress', ''),
tags=tags
))
return instances
def create_instance(self, config: Dict) -> ComputeInstance:
"""Create EC2 instance"""
response = self.ec2.run_instances(
ImageId=config['image_id'],
MinCount=1,
MaxCount=1,
InstanceType=config['instance_type'],
KeyName=config.get('key_name'),
SecurityGroupIds=config.get('security_groups', []),
SubnetId=config.get('subnet_id'),
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [{'Key': k, 'Value': v} for k, v in config.get('tags', {}).items()]
}]
)
instance = response['Instances'][0]
return ComputeInstance(
id=instance['InstanceId'],
name=config.get('tags', {}).get('Name', instance['InstanceId']),
instance_type=instance['InstanceType'],
region=instance['Placement']['AvailabilityZone'][:-1],
status=instance['State']['Name'],
public_ip=None, # Will be assigned later
private_ip=instance.get('PrivateIpAddress', ''),
tags=config.get('tags', {})
)
def get_available_regions(self) -> List[str]:
"""Get AWS regions"""
response = self.ec2.describe_regions()
return [region['RegionName'] for region in response['Regions']]
def get_kubernetes_config(self, cluster_name: str, region: str) -> Dict:
"""Get EKS cluster configuration"""
eks_client = self.session.client('eks', region_name=region)
cluster_info = eks_client.describe_cluster(name=cluster_name)
cluster = cluster_info['cluster']
return {
'cluster_name': cluster_name,
'endpoint': cluster['endpoint'],
'ca_data': cluster['certificateAuthority']['data'],
'region': region,
'cloud_provider': 'aws',
'node_groups': self._get_eks_node_groups(cluster_name, region),
'addons': self._get_eks_addons(cluster_name, region)
}
def _get_eks_node_groups(self, cluster_name: str, region: str) -> List[Dict]:
"""Get EKS node groups"""
eks_client = self.session.client('eks', region_name=region)
node_groups = eks_client.list_nodegroups(clusterName=cluster_name)
node_group_details = []
for ng_name in node_groups['nodegroups']:
ng_info = eks_client.describe_nodegroup(
clusterName=cluster_name,
nodegroupName=ng_name
)
node_group_details.append({
'name': ng_name,
'instance_types': ng_info['nodegroup']['instanceTypes'],
'scaling_config': ng_info['nodegroup']['scalingConfig'],
'status': ng_info['nodegroup']['status']
})
return node_group_details
def _get_eks_addons(self, cluster_name: str, region: str) -> List[Dict]:
"""Get EKS addons"""
eks_client = self.session.client('eks', region_name=region)
try:
addons = eks_client.list_addons(clusterName=cluster_name)
addon_details = []
for addon_name in addons['addons']:
addon_info = eks_client.describe_addon(
clusterName=cluster_name,
addonName=addon_name
)
addon_details.append({
'name': addon_name,
'version': addon_info['addon']['addonVersion'],
'status': addon_info['addon']['status']
})
return addon_details
except Exception:
return []
class AzureProvider(CloudProvider):
"""Azure implementation of cloud provider abstraction"""
def __init__(self, subscription_id: str = None):
self.credential = DefaultAzureCredential()
self.subscription_id = subscription_id or os.getenv('AZURE_SUBSCRIPTION_ID')
self.compute_client = ComputeManagementClient(
self.credential,
self.subscription_id
)
def list_instances(self, region: str = None) -> List[ComputeInstance]:
"""List Azure VMs"""
instances = []
for vm in self.compute_client.virtual_machines.list_all():
# Filter by region if specified
if region and vm.location != region:
continue
instances.append(ComputeInstance(
id=vm.vm_id or vm.name,
name=vm.name,
instance_type=vm.hardware_profile.vm_size,
region=vm.location,
status=self._get_vm_status(vm.name, vm.id.split('/')[4]), # Resource group
public_ip=self._get_vm_public_ip(vm.name, vm.id.split('/')[4]),
private_ip=self._get_vm_private_ip(vm.name, vm.id.split('/')[4]),
tags=vm.tags or {}
))
return instances
def create_instance(self, config: Dict) -> ComputeInstance:
"""Create Azure VM"""
# Implementation would go here
# This is a simplified version
raise NotImplementedError("Azure VM creation not implemented in this example")
def get_available_regions(self) -> List[str]:
"""Get Azure regions"""
from azure.mgmt.resource import ResourceManagementClient
resource_client = ResourceManagementClient(
self.credential,
self.subscription_id
)
locations = resource_client.subscriptions.list_locations(self.subscription_id)
return [location.name for location in locations]
def get_kubernetes_config(self, cluster_name: str, region: str) -> Dict:
"""Get AKS cluster configuration"""
from azure.mgmt.containerservice import ContainerServiceClient
container_client = ContainerServiceClient(
self.credential,
self.subscription_id
)
# Find the cluster (simplified - in reality you'd specify resource group)
for resource_group in self._get_resource_groups():
try:
cluster = container_client.managed_clusters.get(
resource_group.name,
cluster_name
)
return {
'cluster_name': cluster_name,
'endpoint': cluster.fqdn,
'region': region,
'cloud_provider': 'azure',
'node_pools': [
{
'name': pool.name,
'vm_size': pool.vm_size,
'count': pool.count,
'status': pool.provisioning_state
}
for pool in cluster.agent_pool_profiles or []
]
}
except Exception:
continue
raise ValueError(f"Cluster {cluster_name} not found")
def _get_vm_status(self, vm_name: str, resource_group: str) -> str:
"""Get VM power state"""
try:
vm = self.compute_client.virtual_machines.get(
resource_group,
vm_name,
expand='instanceView'
)
for status in vm.instance_view.statuses:
if status.code.startswith('PowerState/'):
return status.code.replace('PowerState/', '')
return 'unknown'
except Exception:
return 'unknown'
def _get_vm_public_ip(self, vm_name: str, resource_group: str) -> Optional[str]:
"""Get VM public IP address"""
# Simplified implementation
return None
def _get_vm_private_ip(self, vm_name: str, resource_group: str) -> str:
"""Get VM private IP address"""
# Simplified implementation
return ""
def _get_resource_groups(self):
"""Get resource groups"""
from azure.mgmt.resource import ResourceManagementClient
resource_client = ResourceManagementClient(
self.credential,
self.subscription_id
)
return resource_client.resource_groups.list()
class GCPProvider(CloudProvider):
"""Google Cloud implementation of cloud provider abstraction"""
def __init__(self, project_id: str = None):
self.project_id = project_id or os.getenv('GOOGLE_CLOUD_PROJECT')
self.compute_client = compute_v1.InstancesClient()
self.regions_client = compute_v1.RegionsClient()
def list_instances(self, region: str = None) -> List[ComputeInstance]:
"""List GCP compute instances"""
instances = []
# List instances across all zones or specific region
zones_to_check = []
if region:
# Get zones for specific region
zones_request = compute_v1.ListZonesRequest(
project=self.project_id,
filter=f"region eq .*{region}"
)
zones = self.zones_client.list(request=zones_request)
zones_to_check = [zone.name for zone in zones]
else:
# Get all zones
zones_request = compute_v1.ListZonesRequest(project=self.project_id)
zones = compute_v1.ZonesClient().list(request=zones_request)
zones_to_check = [zone.name for zone in zones]
for zone in zones_to_check:
try:
request = compute_v1.ListInstancesRequest(
project=self.project_id,
zone=zone
)
for instance in self.compute_client.list(request=request):
instances.append(ComputeInstance(
id=str(instance.id),
name=instance.name,
instance_type=instance.machine_type.split('/')[-1],
region=zone.rsplit('-', 1)[0], # Extract region from zone
status=instance.status.lower(),
public_ip=self._get_instance_public_ip(instance),
private_ip=self._get_instance_private_ip(instance),
tags=dict(instance.labels) if instance.labels else {}
))
except Exception as e:
print(f"Error listing instances in zone {zone}: {e}")
continue
return instances
def create_instance(self, config: Dict) -> ComputeInstance:
"""Create GCP compute instance"""
# Implementation would go here
raise NotImplementedError("GCP instance creation not implemented in this example")
def get_available_regions(self) -> List[str]:
"""Get GCP regions"""
request = compute_v1.ListRegionsRequest(project=self.project_id)
regions = self.regions_client.list(request=request)
return [region.name for region in regions]
def get_kubernetes_config(self, cluster_name: str, region: str) -> Dict:
"""Get GKE cluster configuration"""
from google.cloud import container_v1
container_client = container_v1.ClusterManagerClient()
# Construct the parent path
parent = f"projects/{self.project_id}/locations/{region}"
try:
cluster_path = f"{parent}/clusters/{cluster_name}"
cluster = container_client.get_cluster(name=cluster_path)
return {
'cluster_name': cluster_name,
'endpoint': f"https://{cluster.endpoint}",
'ca_data': cluster.master_auth.cluster_ca_certificate,
'region': region,
'cloud_provider': 'gcp',
'node_pools': [
{
'name': pool.name,
'machine_type': pool.config.machine_type,
'node_count': pool.initial_node_count,
'status': pool.status.name
}
for pool in cluster.node_pools
]
}
except Exception as e:
raise ValueError(f"Error getting cluster {cluster_name}: {e}")
def _get_instance_public_ip(self, instance) -> Optional[str]:
"""Extract public IP from instance"""
try:
for interface in instance.network_interfaces:
for access_config in interface.access_configs:
if access_config.nat_ip:
return access_config.nat_ip
except Exception:
pass
return None
def _get_instance_private_ip(self, instance) -> str:
"""Extract private IP from instance"""
try:
if instance.network_interfaces:
return instance.network_interfaces[0].network_ip or ""
except Exception:
pass
return ""
class MultiCloudManager:
"""Unified multi-cloud management interface"""
def __init__(self):
self.providers = {}
self._initialize_providers()
def _initialize_providers(self):
"""Initialize available cloud providers"""
try:
self.providers['aws'] = AWSProvider()
except Exception as e:
print(f"AWS provider not available: {e}")
try:
self.providers['azure'] = AzureProvider()
except Exception as e:
print(f"Azure provider not available: {e}")
try:
self.providers['gcp'] = GCPProvider()
except Exception as e:
print(f"GCP provider not available: {e}")
def get_all_instances(self) -> Dict[str, List[ComputeInstance]]:
"""Get instances from all available providers"""
all_instances = {}
for provider_name, provider in self.providers.items():
try:
instances = provider.list_instances()
all_instances[provider_name] = instances
print(f"Found {len(instances)} instances in {provider_name.upper()}")
except Exception as e:
print(f"Error listing instances from {provider_name}: {e}")
all_instances[provider_name] = []
return all_instances
def get_kubernetes_clusters(self) -> Dict[str, Dict]:
"""Get Kubernetes cluster information from all providers"""
clusters = {}
# This would need cluster names and regions as input
# Simplified for demonstration
cluster_configs = {
'aws': [('prod-cluster', 'us-east-1'), ('staging-cluster', 'us-west-2')],
'azure': [('prod-cluster', 'eastus')],
'gcp': [('prod-cluster', 'us-central1')]
}
for provider_name, provider in self.providers.items():
clusters[provider_name] = {}
if provider_name in cluster_configs:
for cluster_name, region in cluster_configs[provider_name]:
try:
config = provider.get_kubernetes_config(cluster_name, region)
clusters[provider_name][cluster_name] = config
except Exception as e:
print(f"Error getting {cluster_name} from {provider_name}: {e}")
return clusters
def health_check(self) -> Dict[str, bool]:
"""Check health of all cloud providers"""
health = {}
for provider_name, provider in self.providers.items():
try:
# Simple health check - try to list regions
regions = provider.get_available_regions()
health[provider_name] = len(regions) > 0
except Exception as e:
print(f"Health check failed for {provider_name}: {e}")
health[provider_name] = False
return health
Example usage and testing
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Multi-cloud management CLI')
parser.add_argument('command', choices=['list-instances', 'health-check', 'clusters'],
help='Command to execute')
parser.add_argument('--provider', choices=['aws', 'azure', 'gcp'],
help='Specific provider to target')
args = parser.parse_args()
manager = MultiCloudManager()
if args.command == 'health-check':
health = manager.health_check()
print("\nProvider Health Status:")
for provider, is_healthy in health.items():
status = "✅ HEALTHY" if is_healthy else "❌ UNHEALTHY"
print(f" {provider.upper()}: {status}")
elif args.command == 'list-instances':
instances = manager.get_all_instances()
for provider, instance_list in instances.items():
if args.provider and provider != args.provider:
continue
print(f"\n{provider.upper()} Instances:")
if not instance_list:
print(" No instances found")
continue
for instance in instance_list:
print(f" {instance.name} ({instance.id})")
print(f" Type: {instance.instance_type}")
print(f" Region: {instance.region}")
print(f" Status: {instance.status}")
print(f" IPs: {instance.public_ip or 'N/A'} (public), {instance.private_ip} (private)")
if instance.tags:
print(f" Tags: {instance.tags}")
print()
elif args.command == 'clusters':
clusters = manager.get_kubernetes_clusters()
for provider, cluster_dict in clusters.items():
if args.provider and provider != args.provider:
continue
print(f"\n{provider.upper()} Kubernetes Clusters:")
if not cluster_dict:
print(" No clusters found")
continue
for cluster_name, config in cluster_dict.items():
print(f" {cluster_name}")
print(f" Endpoint: {config['endpoint']}")
print(f" Region: {config['region']}")
if 'node_pools' in config or 'node_groups' in config:
node_info = config.get('node_pools', config.get('node_groups', []))
print(f" Node Pools/Groups: {len(node_info)}")
print()
Unified CI/CD Pipeline for Multi-Cloud
Create a deployment pipeline that can target any cloud provider:
.gitlab-ci.yml - Multi-cloud deployment pipeline
stages:
- validate
- build
- deploy-dev
- test
- deploy-staging
- deploy-productionvariables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: "/certs"
# Multi-cloud configuration
PRIMARY_CLOUD: "aws"
SECONDARY_CLOUD: "azure"
TERTIARY_CLOUD: "gcp"
# Cloud-specific variables
AWS_REGION: "us-east-1"
AZURE_REGION: "eastus"
GCP_REGION: "us-central1"
Validate cloud configurations
validate-cloud-configs:
stage: validate
image: alpine:latest
before_script:
- apk add --no-cache curl jq
script:
# Validate Kubernetes manifests for each cloud
- |
for cloud in aws azure gcp; do
echo "Validating $cloud configuration..."
# Substitute cloud-specific values
envsubst < k8s/deployment.template.yaml > k8s/deployment-$cloud.yaml
# Basic YAML validation
python3 -c "import yaml; yaml.safe_load(open('k8s/deployment-$cloud.yaml'))"
echo "✅ $cloud configuration valid"
done
artifacts:
paths:
- k8s/deployment-*.yaml
expire_in: 1 hourBuild multi-arch container images
build-container:
stage: build
image: docker:20.10.16
services:
- docker:20.10.16-dind
before_script:
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
- docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
- docker buildx create --use --name multi-arch-builder
script:
# Build multi-architecture images for different cloud architectures
- |
docker buildx build \
--platform linux/amd64,linux/arm64 \
--build-arg BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ') \
--build-arg VCS_REF=$CI_COMMIT_SHA \
--build-arg VERSION=$CI_COMMIT_TAG \
-t $CI_REGISTRY_IMAGE:latest \
-t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA \
--push .
Deploy to development (AWS primary)
deploy-dev-aws:
stage: deploy-dev
image: bitnami/kubectl:latest
environment:
name: development
url: https://dev-api.company.com
before_script:
- echo "$AWS_KUBECONFIG" | base64 -d > kubeconfig
- export KUBECONFIG=kubeconfig
- kubectl config use-context dev-cluster
script:
- |
# Apply cloud-specific configuration
export CLOUD_PROVIDER="aws"
export DISK_TYPE="gp3"
export CLOUD_PROVISIONER="ebs.csi.aws.com"
export INGRESS_CLASS="alb"
# Deploy to AWS EKS
envsubst < k8s/deployment.template.yaml | kubectl apply -f -
kubectl set image deployment/api-server api-server=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
kubectl rollout status deployment/api-server --timeout=600s
echo "✅ Deployed to AWS development cluster"
only:
- develop
- merge_requestsDeploy to staging (multi-cloud)
deploy-staging-multicloud:
stage: deploy-staging
image: bitnami/kubectl:latest
environment:
name: staging
url: https://staging-api.company.com
parallel:
matrix:
- CLOUD: aws
KUBECONFIG_VAR: AWS_STAGING_KUBECONFIG
REGION: us-east-1
DISK_TYPE: gp3
PROVISIONER: ebs.csi.aws.com
INGRESS_CLASS: alb
- CLOUD: azure
KUBECONFIG_VAR: AZURE_STAGING_KUBECONFIG
REGION: eastus
DISK_TYPE: Premium_LRS
PROVISIONER: disk.csi.azure.com
INGRESS_CLASS: nginx
- CLOUD: gcp
KUBECONFIG_VAR: GCP_STAGING_KUBECONFIG
REGION: us-central1
DISK_TYPE: pd-ssd
PROVISIONER: pd.csi.storage.gke.io
INGRESS_CLASS: gce
before_script:
- echo "${!KUBECONFIG_VAR}" | base64 -d > kubeconfig-$CLOUD
- export KUBECONFIG=kubeconfig-$CLOUD
script:
- |
# Apply cloud-specific configuration
export CLOUD_PROVIDER=$CLOUD
export CLOUD_PROVISIONER=$PROVISIONER
# Deploy to specific cloud
envsubst < k8s/deployment.template.yaml | kubectl apply -f -
kubectl set image deployment/api-server api-server=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
kubectl rollout status deployment/api-server --timeout=600s
# Verify deployment health
kubectl get pods -l app=api-server
kubectl get svc api-service
echo "✅ Deployed to $CLOUD staging cluster"
only:
- masterProduction deployment with traffic splitting
deploy-production:
stage: deploy-production
image: bitnami/kubectl:latest
environment:
name: production
url: https://api.company.com
when: manual
script:
- |
# Deploy to primary cloud (AWS) first
echo "Deploying to primary cloud (AWS)..."
echo "$AWS_PROD_KUBECONFIG" | base64 -d > kubeconfig-aws
export KUBECONFIG=kubeconfig-aws
# Blue-green deployment on AWS
export CLOUD_PROVIDER="aws"
export DISK_TYPE="gp3"
export CLOUD_PROVISIONER="ebs.csi.aws.com"
envsubst < k8s/deployment.template.yaml | kubectl apply -f -
kubectl set image deployment/api-server api-server=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
kubectl rollout status deployment/api-server --timeout=900s
# Health check before proceeding
if ! kubectl exec -it deploy/api-server -- curl -f http://localhost:8080/health; then
echo "❌ Health check failed on AWS"
exit 1
fi
echo "✅ AWS deployment successful"
# Deploy to secondary cloud (Azure) with 30% traffic
echo "Deploying to secondary cloud (Azure)..."
echo "$AZURE_PROD_KUBECONFIG" | base64 -d > kubeconfig-azure
export KUBECONFIG=kubeconfig-azure
export CLOUD_PROVIDER="azure"
export DISK_TYPE="Premium_LRS"
export CLOUD_PROVISIONER="disk.csi.azure.com"
envsubst < k8s/deployment.template.yaml | kubectl apply -f -
kubectl set image deployment/api-server api-server=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
kubectl rollout status deployment/api-server --timeout=900s
# Configure traffic split (using Istio or similar)
kubectl apply -f k8s/traffic-split-azure-30.yaml
echo "✅ Multi-cloud production deployment complete"
echo "Traffic split: 70% AWS, 30% Azure"
only:
- master
- tagsAutomated rollback capability
rollback-production:
stage: deploy-production
image: bitnami/kubectl:latest
when: manual
script:
- |
echo "Rolling back production deployment..."
# Rollback on all clouds
for cloud in aws azure; do
echo "Rolling back $cloud..."
kubeconfig_var="${cloud^^}_PROD_KUBECONFIG"
echo "${!kubeconfig_var}" | base64 -d > kubeconfig-$cloud
export KUBECONFIG=kubeconfig-$cloud
# Rollback to previous version
kubectl rollout undo deployment/api-server
kubectl rollout status deployment/api-server --timeout=600s
echo "✅ $cloud rollback complete"
done
echo "✅ Multi-cloud rollback complete"
only:
- master
- tags
Network Architecture for Multi-Cloud
Implement secure, efficient networking across clouds:
#!/usr/bin/env python3
multi-cloud-networking.py - Cross-cloud network management
import json
import time
import subprocess
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class NetworkPeering:
"""Network peering configuration"""
name: str
source_cloud: str
target_cloud: str
source_network: str
target_network: str
status: str
bandwidth_gbps: float
latency_ms: float
@dataclass
class NetworkRoute:
"""Network routing configuration"""
destination_cidr: str
next_hop: str
priority: int
cloud_provider: str
class MultiCloudNetworking:
"""Multi-cloud networking management"""
def __init__(self):
self.peering_connections = []
self.routing_table = []
self.vpn_connections = {}
def setup_cross_cloud_connectivity(self) -> Dict:
"""Setup connectivity between cloud providers"""
connectivity_map = {
'aws-azure': self._setup_aws_azure_peering(),
'aws-gcp': self._setup_aws_gcp_peering(),
'azure-gcp': self._setup_azure_gcp_peering()
}
return connectivity_map
def _setup_aws_azure_peering(self) -> Dict:
"""Setup AWS-Azure connectivity via VPN"""
# AWS VPC configuration
aws_config = {
'vpc_cidr': '10.1.0.0/16',
'public_subnets': ['10.1.1.0/24', '10.1.2.0/24'],
'private_subnets': ['10.1.10.0/24', '10.1.20.0/24'],
'vpn_gateway': {
'type': 'ipsec.1',
'routing': 'static',
'tunnel_cidrs': ['169.254.21.0/30', '169.254.22.0/30']
}
}
# Azure VNet configuration
azure_config = {
'vnet_cidr': '10.2.0.0/16',
'public_subnets': ['10.2.1.0/24', '10.2.2.0/24'],
'private_subnets': ['10.2.10.0/24', '10.2.20.0/24'],
'vpn_gateway': {
'type': 'RouteBased',
'sku': 'VpnGw1',
'tunnel_cidrs': ['169.254.21.0/30', '169.254.22.0/30']
}
}
# Create Terraform configuration for AWS-Azure connectivity
terraform_config = self._generate_aws_azure_terraform(aws_config, azure_config)
return {
'status': 'configured',
'aws_config': aws_config,
'azure_config': azure_config,
'terraform': terraform_config,
'estimated_latency_ms': 45,
'bandwidth_gbps': 1.25
}
def _setup_aws_gcp_peering(self) -> Dict:
"""Setup AWS-GCP connectivity"""
# Use Cloud Interconnect for high bandwidth
aws_config = {
'vpc_cidr': '10.1.0.0/16',
'transit_gateway': True,
'dedicated_connection': {
'location': 'Equinix SV1',
'bandwidth': '10Gbps',
'vlan_id': 100
}
}
gcp_config = {
'vpc_cidr': '10.3.0.0/16',
'cloud_router': True,
'interconnect': {
'type': 'DEDICATED',
'location': 'Equinix-SV1',
'bandwidth': '10Gbps',
'vlan_id': 100
}
}
return {
'status': 'configured',
'aws_config': aws_config,
'gcp_config': gcp_config,
'estimated_latency_ms': 25,
'bandwidth_gbps': 10.0
}
def _setup_azure_gcp_peering(self) -> Dict:
"""Setup Azure-GCP connectivity"""
azure_config = {
'vnet_cidr': '10.2.0.0/16',
'express_route': {
'circuit_sku': 'Standard',
'bandwidth': '1Gbps',
'peering_location': 'Silicon Valley'
}
}
gcp_config = {
'vpc_cidr': '10.3.0.0/16',
'partner_interconnect': {
'type': 'PARTNER',
'capacity': '1Gbps',
'region': 'us-west1'
}
}
return {
'status': 'configured',
'azure_config': azure_config,
'gcp_config': gcp_config,
'estimated_latency_ms': 35,
'bandwidth_gbps': 1.0
}
def _generate_aws_azure_terraform(self, aws_config: Dict, azure_config: Dict) -> str:
"""Generate Terraform for AWS-Azure connectivity"""
terraform_config = f"""
AWS VPC and VPN Gateway
provider "aws" {{
region = "us-east-1"
}}resource "aws_vpc" "main" {{
cidr_block = "{aws_config['vpc_cidr']}"
enable_dns_hostnames = true
enable_dns_support = true
tags = {{
Name = "multi-cloud-vpc"
Environment = "production"
}}
}}
AWS Internet Gateway
resource "aws_internet_gateway" "main" {{
vpc_id = aws_vpc.main.id
tags = {{
Name = "multi-cloud-igw"
}}
}}AWS VPN Gateway
resource "aws_vpn_gateway" "main" {{
vpc_id = aws_vpc.main.id
tags = {{
Name = "multi-cloud-vpn-gw"
}}
}}Customer Gateway (Azure end)
resource "aws_customer_gateway" "azure" {{
bgp_asn = 65000
ip_address = azurerm_public_ip.vpn_gateway.ip_address
type = "ipsec.1"
tags = {{
Name = "azure-customer-gateway"
}}
}}VPN Connection
resource "aws_vpn_connection" "azure" {{
vpn_gateway_id = aws_vpn_gateway.main.id
customer_gateway_id = aws_customer_gateway.azure.id
type = "ipsec.1"
static_routes_only = true
tags = {{
Name = "aws-azure-vpn"
}}
}}VPN Connection Route
resource "aws_vpn_connection_route" "azure" {{
vpn_connection_id = aws_vpn_connection.azure.id
destination_cidr_block = "{azure_config['vnet_cidr']}"
}}Azure Provider
provider "azurerm" {{
features {{}}
}}Azure Resource Group
resource "azurerm_resource_group" "main" {{
name = "multi-cloud-rg"
location = "East US"
}}Azure Virtual Network
resource "azurerm_virtual_network" "main" {{
name = "multi-cloud-vnet"
address_space = ["{azure_config['vnet_cidr']}"]
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
}}Azure Gateway Subnet
resource "azurerm_subnet" "gateway" {{
name = "GatewaySubnet"
resource_group_name = azurerm_resource_group.main.name
virtual_network_name = azurerm_virtual_network.main.name
address_prefixes = ["10.2.255.0/27"]
}}Azure Public IP for VPN Gateway
resource "azurerm_public_ip" "vpn_gateway" {{
name = "multi-cloud-vpn-gateway-ip"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
allocation_method = "Dynamic"
}}Azure VPN Gateway
resource "azurerm_virtual_network_gateway" "main" {{
name = "multi-cloud-vpn-gateway"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
type = "Vpn"
vpn_type = "RouteBased"
active_active = false
enable_bgp = false
sku = "VpnGw1"
ip_configuration {{
name = "vnetGatewayConfig"
public_ip_address_id = azurerm_public_ip.vpn_gateway.id
private_ip_address_allocation = "Dynamic"
subnet_id = azurerm_subnet.gateway.id
}}
}}Azure Local Network Gateway (AWS end)
resource "azurerm_local_network_gateway" "aws" {{
name = "aws-local-gateway"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
gateway_address = aws_vpn_connection.azure.tunnel1_address
address_space = ["{aws_config['vpc_cidr']}"]
}}Azure VPN Connection
resource "azurerm_virtual_network_gateway_connection" "aws" {{
name = "azure-aws-connection"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
type = "IPSec"
virtual_network_gateway_id = azurerm_virtual_network_gateway.main.id
local_network_gateway_id = azurerm_local_network_gateway.aws.id
shared_key = aws_vpn_connection.azure.tunnel1_preshared_key
}}Output connection information
output "aws_vpn_connection_id" {{
value = aws_vpn_connection.azure.id
}}output "azure_connection_id" {{
value = azurerm_virtual_network_gateway_connection.aws.id
}}
output "tunnel_ips" {{
value = {{
tunnel1 = aws_vpn_connection.azure.tunnel1_address
tunnel2 = aws_vpn_connection.azure.tunnel2_address
}}
}}
"""
return terraform_config
def monitor_network_performance(self) -> Dict:
"""Monitor cross-cloud network performance"""
performance_metrics = {}
# Test connectivity between clouds
test_endpoints = {
'aws-azure': {
'source': '10.1.10.10',
'target': '10.2.10.10',
'expected_latency_ms': 45
},
'aws-gcp': {
'source': '10.1.10.10',
'target': '10.3.10.10',
'expected_latency_ms': 25
},
'azure-gcp': {
'source': '10.2.10.10',
'target': '10.3.10.10',
'expected_latency_ms': 35
}
}
for connection, config in test_endpoints.items():
try:
latency = self._measure_latency(config['source'], config['target'])
bandwidth = self._measure_bandwidth(config['source'], config['target'])
packet_loss = self._measure_packet_loss(config['source'], config['target'])
performance_metrics[connection] = {
'latency_ms': latency,
'bandwidth_mbps': bandwidth,
'packet_loss_percent': packet_loss,
'status': 'healthy' if latency < config['expected_latency_ms'] * 1.5 else 'degraded'
}
except Exception as e:
performance_metrics[connection] = {
'status': 'error',
'error': str(e)
}
return performance_metrics
def _measure_latency(self, source: str, target: str) -> float:
"""Measure network latency between endpoints"""
# Simplified implementation - in reality would use proper network tools
try:
result = subprocess.run(
['ping', '-c', '10', target],
capture_output=True,
text=True,
timeout=30
)
# Parse ping output to get average latency
output = result.stdout
if 'avg' in output:
# Extract average from: rtt min/avg/max/mdev = 1.234/5.678/9.012/1.234 ms
avg_line = [line for line in output.split('\n') if 'avg' in line][0]
avg_latency = float(avg_line.split('/')[5]) # Get avg value
return avg_latency
return 999.0 # High latency if parsing fails
except Exception:
return 999.0
def _measure_bandwidth(self, source: str, target: str) -> float:
"""Measure bandwidth between endpoints"""
# Simplified implementation - would use iperf3 or similar
# For demo purposes, return simulated values
return 850.0 # Mbps
def _measure_packet_loss(self, source: str, target: str) -> float:
"""Measure packet loss between endpoints"""
# Simplified implementation
return 0.1 # 0.1% packet loss
def optimize_routing(self) -> Dict:
"""Optimize routing across clouds"""
optimization_results = {}
# Analyze current routing performance
current_performance = self.monitor_network_performance()
# Identify optimization opportunities
optimizations = []
for connection, metrics in current_performance.items():
if metrics.get('status') == 'degraded':
if metrics.get('latency_ms', 0) > 100:
optimizations.append({
'connection': connection,
'issue': 'high_latency',
'recommendation': 'Consider dedicated connection or traffic engineering',
'priority': 'high'
})
if metrics.get('packet_loss_percent', 0) > 1.0:
optimizations.append({
'connection': connection,
'issue': 'packet_loss',
'recommendation': 'Review QoS settings and connection reliability',
'priority': 'critical'
})
# Apply automatic optimizations where possible
applied_optimizations = []
for opt in optimizations:
if opt['issue'] == 'high_latency':
# Enable traffic compression
self._enable_traffic_compression(opt['connection'])
applied_optimizations.append(f"Enabled compression for {opt['connection']}")
optimization_results = {
'identified_issues': len(optimizations),
'optimizations_applied': applied_optimizations,
'recommendations': optimizations,
'next_review': time.time() + 3600 # Review in 1 hour
}
return optimization_results
def _enable_traffic_compression(self, connection: str):
"""Enable traffic compression for a connection"""
# Implementation would configure actual network devices
print(f"Enabling traffic compression for {connection}")
Example usage
if __name__ == "__main__":
networking = MultiCloudNetworking()
print("Setting up multi-cloud connectivity...")
connectivity = networking.setup_cross_cloud_connectivity()
for connection, config in connectivity.items():
print(f"\n{connection.upper()}:")
print(f" Status: {config['status']}")
print(f" Estimated Latency: {config['estimated_latency_ms']}ms")
print(f" Bandwidth: {config['bandwidth_gbps']}Gbps")
print("\nMonitoring network performance...")
performance = networking.monitor_network_performance()
for connection, metrics in performance.items():
print(f"\n{connection.upper()}:")
if metrics.get('status') == 'error':
print(f" Status: ❌ Error - {metrics['error']}")
else:
print(f" Status: {'✅' if metrics['status'] == 'healthy' else '⚠️'} {metrics['status']}")
print(f" Latency: {metrics.get('latency_ms', 'N/A')}ms")
print(f" Bandwidth: {metrics.get('bandwidth_mbps', 'N/A')}Mbps")
print(f" Packet Loss: {metrics.get('packet_loss_percent', 'N/A')}%")
print("\nOptimizing routing...")
optimization = networking.optimize_routing()
print(f"Issues identified: {optimization['identified_issues']}")
if optimization['optimizations_applied']:
print("Applied optimizations:")
for opt in optimization['optimizations_applied']:
print(f" - {opt}")
if optimization['recommendations']:
print("Additional recommendations:")
for rec in optimization['recommendations']:
print(f" - {rec['recommendation']} (Priority: {rec['priority']})")
Data Management Across Clouds
Implement consistent data management practices:
#!/usr/bin/env python3
multi-cloud-data.py - Cross-cloud data management
import json
import boto3
import asyncio
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
@dataclass
class DataLocation:
"""Data location tracking"""
cloud_provider: str
region: str
service: str # S3, Blob Storage, Cloud Storage
bucket_name: str
path: str
size_bytes: int
last_modified: datetime
encryption_status: str
@dataclass
class DataSyncJob:
"""Data synchronization job"""
job_id: str
source: DataLocation
destination: DataLocation
status: str
progress_percent: float
bytes_transferred: int
start_time: datetime
estimated_completion: Optional[datetime]
class MultiCloudDataManager:
"""Cross-cloud data management and synchronization"""
def __init__(self):
self.aws_s3 = boto3.client('s3')
self.sync_jobs = {}
self.data_catalog = {}
def create_data_catalog(self) -> Dict:
"""Create comprehensive data catalog across clouds"""
catalog = {
'aws': self._catalog_aws_data(),
'azure': self._catalog_azure_data(),
'gcp': self._catalog_gcp_data(),
'metadata': {
'total_objects': 0,
'total_size_gb': 0,
'last_updated': datetime.utcnow().isoformat()
}
}
# Calculate totals
total_objects = sum(len(cloud_data.get('objects', [])) for cloud_data in catalog.values() if isinstance(cloud_data, dict) and 'objects' in cloud_data)
total_size = sum(
sum(obj.get('size_bytes', 0) for obj in cloud_data.get('objects', []))
for cloud_data in catalog.values()
if isinstance(cloud_data, dict) and 'objects' in cloud_data
)
catalog['metadata']['total_objects'] = total_objects
catalog['metadata']['total_size_gb'] = round(total_size / (10243), 2)
self.data_catalog = catalog
return catalog
def _catalog_aws_data(self) -> Dict:
"""Catalog AWS S3 data"""
aws_data = {
'buckets': [],
'objects': [],
'total_size_bytes': 0
}
try:
# List all S3 buckets
buckets_response = self.aws_s3.list_buckets()
for bucket in buckets_response['Buckets']:
bucket_name = bucket['Name']
bucket_info = {
'name': bucket_name,
'creation_date': bucket['CreationDate'].isoformat(),
'region': self._get_bucket_region(bucket_name),
'objects': [],
'size_bytes': 0
}
try:
# List objects in bucket (limited to first 1000 for performance)
objects_response = self.aws_s3.list_objects_v2(
Bucket=bucket_name,
MaxKeys=1000
)
if 'Contents' in objects_response:
for obj in objects_response['Contents']:
object_info = {
'key': obj['Key'],
'size_bytes': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'etag': obj['ETag'].strip('"'),
'storage_class': obj.get('StorageClass', 'STANDARD')
}
bucket_info['objects'].append(object_info)
bucket_info['size_bytes'] += obj['Size']
# Add to global objects list
aws_data['objects'].append(DataLocation(
cloud_provider='aws',
region=bucket_info['region'],
service='s3',
bucket_name=bucket_name,
path=obj['Key'],
size_bytes=obj['Size'],
last_modified=obj['LastModified'],
encryption_status=self._check_s3_encryption(bucket_name, obj['Key'])
))
aws_data['total_size_bytes'] += bucket_info['size_bytes']
except Exception as e:
bucket_info['error'] = f"Unable to list objects: {str(e)}"
aws_data['buckets'].append(bucket_info)
except Exception as e:
aws_data['error'] = f"Unable to access AWS S3: {str(e)}"
return aws_data
def _catalog_azure_data(self) -> Dict:
"""Catalog Azure Blob Storage data"""
# Simplified implementation - would use Azure SDK
azure_data = {
'storage_accounts': [],
'objects': [],
'total_size_bytes': 0,
'note': 'Requires Azure SDK implementation'
}
return azure_data
def _catalog_gcp_data(self) -> Dict:
"""Catalog Google Cloud Storage data"""
# Simplified implementation - would use GCP SDK
gcp_data = {
'buckets': [],
'objects': [],
'total_size_bytes': 0,
'note': 'Requires GCP SDK implementation'
}
return gcp_data
def _get_bucket_region(self, bucket_name: str) -> str:
"""Get S3 bucket region"""
try:
response = self.aws_s3.get_bucket_location(Bucket=bucket_name)
region = response.get('LocationConstraint')
return region if region else 'us-east-1' # Default region
except Exception:
return 'unknown'
def _check_s3_encryption(self, bucket_name: str, object_key: str) -> str:
"""Check S3 object encryption status"""
try:
response = self.aws_s3.head_object(Bucket=bucket_name, Key=object_key)
server_side_encryption = response.get('ServerSideEncryption', 'none')
return server_side_encryption
except Exception:
return 'unknown'
def setup_cross_cloud_replication(self, replication_config: Dict) -> Dict:
"""Setup data replication across clouds"""
replication_jobs = []
for config in replication_config.get('replications', []):
job = DataSyncJob(
job_id=f"sync-{hash(str(config))}",
source=DataLocation(config['source']),
destination=DataLocation(config['destination']),
status='pending',
progress_percent=0.0,
bytes_transferred=0,
start_time=datetime.utcnow(),
estimated_completion=None
)
# Start replication job
self._start_replication_job(job)
replication_jobs.append(job)
return {
'jobs_created': len(replication_jobs),
'jobs': [
{
'job_id': job.job_id,
'source': f"{job.source.cloud_provider}:{job.source.bucket_name}/{job.source.path}",
'destination': f"{job.destination.cloud_provider}:{job.destination.bucket_name}/{job.destination.path}",
'status': job.status
}
for job in replication_jobs
]
}
def _start_replication_job(self, job: DataSyncJob):
"""Start a data replication job"""
try:
# Simplified implementation
if job.source.cloud_provider == 'aws' and job.destination.cloud_provider == 'azure':
self._replicate_aws_to_azure(job)
elif job.source.cloud_provider == 'aws' and job.destination.cloud_provider == 'gcp':
self._replicate_aws_to_gcp(job)
# Add other combinations...
job.status = 'running'
self.sync_jobs[job.job_id] = job
except Exception as e:
job.status = 'failed'
job.error = str(e)
def _replicate_aws_to_azure(self, job: DataSyncJob):
"""Replicate data from AWS S3 to Azure Blob Storage"""
# This would implement the actual replication logic
# For now, simulate the process
print(f"Starting replication: AWS S3 -> Azure Blob")
print(f"Source: {job.source.bucket_name}/{job.source.path}")
print(f"Destination: {job.destination.bucket_name}/{job.destination.path}")
# In real implementation:
# 1. Download from S3
# 2. Upload to Azure Blob Storage
# 3. Verify integrity
# 4. Update job progress
job.progress_percent = 100.0
job.status = 'completed'
def _replicate_aws_to_gcp(self, job: DataSyncJob):
"""Replicate data from AWS S3 to Google Cloud Storage"""
print(f"Starting replication: AWS S3 -> Google Cloud Storage")
print(f"Source: {job.source.bucket_name}/{job.source.path}")
print(f"Destination: {job.destination.bucket_name}/{job.destination.path}")
# Implementation would go here...
job.progress_percent = 100.0
job.status = 'completed'
def monitor_data_consistency(self) -> Dict:
"""Monitor data consistency across clouds"""
consistency_report = {
'timestamp': datetime.utcnow().isoformat(),
'checks_performed': 0,
'inconsistencies_found': 0,
'details': []
}
# Check for objects that should be replicated
for job_id, job in self.sync_jobs.items():
if job.status == 'completed':
consistency_check = self._verify_replication_integrity(job)
consistency_report['checks_performed'] += 1
if not consistency_check['consistent']:
consistency_report['inconsistencies_found'] += 1
consistency_report['details'].append({
'job_id': job_id,
'issue': consistency_check['issue'],
'source_hash': consistency_check.get('source_hash'),
'destination_hash': consistency_check.get('destination_hash')
})
return consistency_report
def _verify_replication_integrity(self, job: DataSyncJob) -> Dict:
"""Verify integrity of replicated data"""
try:
# Get checksums from both source and destination
source_hash = self._get_object_hash(job.source)
destination_hash = self._get_object_hash(job.destination)
if source_hash == destination_hash:
return {
'consistent': True,
'source_hash': source_hash,
'destination_hash': destination_hash
}
else:
return {
'consistent': False,
'issue': 'hash_mismatch',
'source_hash': source_hash,
'destination_hash': destination_hash
}
except Exception as e:
return {
'consistent': False,
'issue': f'verification_error: {str(e)}'
}
def _get_object_hash(self, location: DataLocation) -> str:
"""Get hash of object at location"""
if location.cloud_provider == 'aws':
try:
response = self.aws_s3.head_object(
Bucket=location.bucket_name,
Key=location.path
)
return response.get('ETag', '').strip('"')
except Exception:
return 'error'
elif location.cloud_provider == 'azure':
# Would implement Azure blob hash retrieval
return 'azure_hash_placeholder'
elif location.cloud_provider == 'gcp':
# Would implement GCP object hash retrieval
return 'gcp_hash_placeholder'
return 'unknown'
def optimize_data_placement(self) -> Dict:
"""Optimize data placement across clouds"""
optimization_report = {
'recommendations': [],
'potential_savings_usd': 0,
'performance_improvements': []
}
# Analyze current data catalog
if not self.data_catalog:
self.create_data_catalog()
# Cost optimization recommendations
for cloud_provider, data in self.data_catalog.items():
if isinstance(data, dict) and 'objects' in data:
for obj in data['objects']:
if isinstance(obj, DataLocation):
# Recommend moving cold data to cheaper storage
if self._is_cold_data(obj):
savings = self._calculate_storage_savings(obj)
optimization_report['recommendations'].append({
'type': 'storage_class_optimization',
'object': f"{obj.bucket_name}/{obj.path}",
'current_cloud': obj.cloud_provider,
'recommendation': 'Move to cold storage',
'potential_savings_usd_monthly': savings
})
optimization_report['potential_savings_usd'] += savings
# Recommend geographic optimization
perf_improvement = self._analyze_geographic_placement(obj)
if perf_improvement:
optimization_report['performance_improvements'].append(perf_improvement)
return optimization_report
def _is_cold_data(self, obj: DataLocation) -> bool:
"""Determine if data is considered cold (rarely accessed)"""
# Simple heuristic: data not modified in 90 days
threshold_date = datetime.utcnow() - timedelta(days=90)
return obj.last_modified < threshold_date
def _calculate_storage_savings(self, obj: DataLocation) -> float:
"""Calculate potential storage cost savings"""
# Simplified cost calculation (would use actual cloud pricing)
storage_cost_per_gb = {
'aws_standard': 0.023, # S3 Standard
'aws_ia': 0.0125, # S3 IA
'aws_glacier': 0.004, # S3 Glacier
'azure_hot': 0.024, # Azure Hot
'azure_cool': 0.015, # Azure Cool
'azure_archive': 0.002, # Azure Archive
'gcp_standard': 0.020, # GCP Standard
'gcp_nearline': 0.010, # GCP Nearline
'gcp_coldline': 0.004 # GCP Coldline
}
current_cost_key = f"{obj.cloud_provider}_standard"
cold_cost_key = f"{obj.cloud_provider}_glacier" if obj.cloud_provider == 'aws' else f"{obj.cloud_provider}_archive"
current_cost = storage_cost_per_gb.get(current_cost_key, 0.025)
cold_cost = storage_cost_per_gb.get(cold_cost_key, 0.005)
size_gb = obj.size_bytes / (10243)
monthly_savings = (current_cost - cold_cost) * size_gb
return round(monthly_savings, 2)
def _analyze_geographic_placement(self, obj: DataLocation) -> Optional[Dict]:
"""Analyze if data should be moved to different geographic location"""
# Simplified analysis - would use actual access patterns
access_regions = ['us-east-1', 'eu-west-1', 'ap-southeast-1'] # Most common access regions
if obj.region not in access_regions:
return {
'object': f"{obj.bucket_name}/{obj.path}",
'current_region': obj.region,
'recommended_region': access_regions[0], # Closest major region
'expected_latency_improvement_ms': 50,
'expected_cost_change_percent': -15
}
return None
Example usage and CLI
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Multi-cloud data management')
parser.add_argument('command', choices=['catalog', 'replicate', 'monitor', 'optimize'],
help='Command to execute')
parser.add_argument('--config', help='Configuration file path')
args = parser.parse_args()
manager = MultiCloudDataManager()
if args.command == 'catalog':
print("Creating data catalog across clouds...")
catalog = manager.create_data_catalog()
print(f"\nData Catalog Summary:")
print(f"Total Objects: {catalog['metadata']['total_objects']}")
print(f"Total Size: {catalog['metadata']['total_size_gb']} GB")
for cloud, data in catalog.items():
if cloud != 'metadata' and isinstance(data, dict):
if 'buckets' in data:
print(f"\n{cloud.upper()}:")
print(f" Buckets: {len(data['buckets'])}")
print(f" Objects: {len(data.get('objects', []))}")
print(f" Size: {round(data.get('total_size_bytes', 0) / (10243), 2)} GB")
elif args.command == 'monitor':
print("Monitoring data consistency...")
consistency = manager.monitor_data_consistency()
print(f"\nConsistency Report:")
print(f"Checks Performed: {consistency['checks_performed']}")
print(f"Inconsistencies Found: {consistency['inconsistencies_found']}")
if consistency['details']:
print("\nInconsistencies:")
for detail in consistency['details']:
print(f" Job {detail['job_id']}: {detail['issue']}")
elif args.command == 'optimize':
print("Analyzing data placement optimization...")
opt_results = manager.optimize_data_placement()
print("\nOptimization Report:")
print(f"Recommendations: " + str(len(opt_results['recommendations'])))
print(f"Potential Monthly Savings: \$" + str(opt_results['potential_savings_usd']))
if opt_results['recommendations']:
print("\nTop Recommendations:")
for i, rec in enumerate(opt_results['recommendations'][:5], 1):
print(f" " + str(i) + ". " + rec['recommendation'])
print(f" Object: " + rec['object'])
print(f" Savings: \$" + str(rec['potential_savings_usd_monthly']) + "/month")
print()
if opt_results['performance_improvements']:
print("Performance Improvements:")
for improvement in opt_results['performance_improvements'][:3]:
print(f" - Move " + improvement['object'] + " to " + improvement['recommended_region'])
print(f" Expected latency improvement: " + str(improvement['expected_latency_improvement_ms']) + "ms")
print()
Disaster Recovery Across Clouds
Implement robust disaster recovery strategies:
#!/bin/bash
multi-cloud-dr.sh - Disaster recovery orchestration
set -euo pipefail
Configuration
PRIMARY_CLOUD="${PRIMARY_CLOUD:-aws}"
SECONDARY_CLOUD="${SECONDARY_CLOUD:-azure}"
DR_NAMESPACE="${DR_NAMESPACE:-disaster-recovery}"
RTO_MINUTES="${RTO_MINUTES:-15}" # Recovery Time Objective
RPO_MINUTES="${RPO_MINUTES:-5}" # Recovery Point ObjectiveColors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Colorlog_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
log_debug() {
echo -e "${BLUE}[DEBUG]${NC} $1"
}
Health check for primary cloud
check_primary_health() {
local primary_cloud="$1"
log_info "Checking health of primary cloud ($primary_cloud)..."
case "$primary_cloud" in
"aws")
# Check AWS EKS cluster health
if ! aws eks describe-cluster --name prod-cluster --region us-east-1 >/dev/null 2>&1; then
log_error "AWS EKS cluster is not accessible"
return 1
fi
# Check critical services
kubectl config use-context aws-prod-cluster
if ! kubectl get nodes --no-headers | grep -q "Ready"; then
log_error "No ready nodes in AWS cluster"
return 1
fi
# Check application pods
local unhealthy_pods=$(kubectl get pods -n production --no-headers | grep -v "Running\|Completed" | wc -l)
if [ "$unhealthy_pods" -gt 0 ]; then
log_warn "$unhealthy_pods unhealthy pods detected in AWS"
return 1
fi
;;
"azure")
# Check Azure AKS cluster health
if ! az aks show --name prod-cluster --resource-group prod-rg >/dev/null 2>&1; then
log_error "Azure AKS cluster is not accessible"
return 1
fi
kubectl config use-context azure-prod-cluster
if ! kubectl get nodes --no-headers | grep -q "Ready"; then
log_error "No ready nodes in Azure cluster"
return 1
fi
;;
"gcp")
# Check GKE cluster health
if ! gcloud container clusters describe prod-cluster --zone us-central1-a >/dev/null 2>&1; then
log_error "GCP GKE cluster is not accessible"
return 1
fi
;;
esac
log_info "Primary cloud ($primary_cloud) is healthy"
return 0
}Initiate disaster recovery failover
initiate_failover() {
local primary="$1"
local secondary="$2"
local reason="${3:-manual_trigger}"
log_info "🚨 INITIATING DISASTER RECOVERY FAILOVER"
log_info "Primary: $primary -> Secondary: $secondary"
log_info "Reason: $reason"
log_info "RTO Target: $RTO_MINUTES minutes"
# Record failover start time
local failover_start=$(date +%s)
# Step 1: Stop new traffic to primary
log_info "Step 1: Stopping new traffic to primary cloud..."
stop_primary_traffic "$primary"
# Step 2: Ensure data consistency
log_info "Step 2: Ensuring data consistency..."
sync_data_to_secondary "$primary" "$secondary"
# Step 3: Activate secondary cloud
log_info "Step 3: Activating secondary cloud..."
activate_secondary_cloud "$secondary"
# Step 4: Update DNS and load balancers
log_info "Step 4: Updating DNS to point to secondary cloud..."
update_dns_to_secondary "$secondary"
# Step 5: Verify secondary is operational
log_info "Step 5: Verifying secondary cloud operation..."
if verify_secondary_operation "$secondary"; then
local failover_end=$(date +%s)
local failover_duration=$(( (failover_end - failover_start) / 60 ))
log_info "✅ Disaster recovery failover completed successfully"
log_info "Failover duration: $failover_duration minutes (Target: $RTO_MINUTES minutes)"
# Send notifications
send_failover_notification "success" "$primary" "$secondary" "$failover_duration" "$reason"
return 0
else
log_error "❌ Secondary cloud verification failed"
log_error "Manual intervention required"
send_failover_notification "failed" "$primary" "$secondary" "N/A" "$reason"
return 1
fi
}Stop traffic to primary cloud
stop_primary_traffic() {
local primary="$1"
case "$primary" in
"aws")
# Update ALB target groups to drain connections
log_debug "Draining AWS ALB target groups..."
# Get ALB target group ARNs
local target_groups=$(aws elbv2 describe-target-groups --query 'TargetGroups[?starts_with(TargetGroupName, prod-
)].TargetGroupArn' --output text)
for tg_arn in $target_groups; do
log_debug "Draining target group: $tg_arn"
# In reality, you'd modify the target group to remove healthy targets
# aws elbv2 modify-target-group --target-group-arn $tg_arn --health-check-enabled false
done
;;
"azure")
# Update Azure Load Balancer
log_debug "Updating Azure Load Balancer rules..."
# az network lb rule update --resource-group prod-rg --lb-name prod-lb --name http-rule --backend-pool-name empty-pool
;;
"gcp")
# Update GCP Load Balancer
log_debug "Updating GCP Load Balancer backend services..."
# gcloud compute backend-services update prod-backend --global --no-backends
;;
esac
log_info "Traffic stopped to primary cloud ($primary)"
}Synchronize data to secondary cloud
sync_data_to_secondary() {
local primary="$1"
local secondary="$2"
log_info "Synchronizing critical data from $primary to $secondary..."
# Database synchronization
case "$primary-$secondary" in
"aws-azure")
log_debug "Syncing AWS RDS to Azure Database..."
# Create final backup and restore to Azure
python3 /scripts/sync-aws-azure-db.py --final-sync
;;
"aws-gcp")
log_debug "Syncing AWS RDS to Cloud SQL..."
python3 /scripts/sync-aws-gcp-db.py --final-sync
;;
"azure-aws")
log_debug "Syncing Azure Database to AWS RDS..."
python3 /scripts/sync-azure-aws-db.py --final-sync
;;
esac
# Application state synchronization
log_debug "Syncing application state and sessions..."
# Redis/cache synchronization
kubectl config use-context "${primary}-prod-cluster"
kubectl exec -n production deploy/redis -- redis-cli BGSAVE
# Wait for backup to complete
sleep 10
# Copy Redis dump to secondary cloud
kubectl cp production/redis-0:/data/dump.rdb /tmp/redis-backup.rdb
kubectl config use-context "${secondary}-prod-cluster"
kubectl cp /tmp/redis-backup.rdb production/redis-0:/data/dump.rdb
kubectl exec -n production deploy/redis -- redis-cli DEBUG RESTART
log_info "Data synchronization completed"
}Activate secondary cloud
activate_secondary_cloud() {
local secondary="$1"
log_info "Activating secondary cloud ($secondary)..."
kubectl config use-context "${secondary}-prod-cluster"
# Scale up applications in secondary cloud
log_debug "Scaling up applications in secondary cloud..."
# Scale critical services
kubectl scale deployment api-server --replicas=5 -n production
kubectl scale deployment worker --replicas=3 -n production
kubectl scale deployment frontend --replicas=3 -n production
# Wait for pods to be ready
log_debug "Waiting for pods to be ready..."
kubectl wait --for=condition=ready pod -l app=api-server -n production --timeout=600s
kubectl wait --for=condition=ready pod -l app=worker -n production --timeout=600s
kubectl wait --for=condition=ready pod -l app=frontend -n production --timeout=600s
# Update configuration for DR mode
log_debug "Updating configuration for disaster recovery mode..."
kubectl patch configmap app-config -n production --patch '{
"data": {
"mode": "disaster-recovery",
"primary-cloud": "false",
"dr-activated-timestamp": "'"$(date -u +%Y-%m-%dT%H:%M:%SZ)"'"
}
}'
# Restart pods to pick up new configuration
kubectl rollout restart deployment/api-server -n production
kubectl rollout restart deployment/worker -n production
log_info "Secondary cloud ($secondary) activated"
}Update DNS to point to secondary cloud
update_dns_to_secondary() {
local secondary="$1"
log_info "Updating DNS records to point to secondary cloud..."
# Get secondary cloud load balancer IP
kubectl config use-context "${secondary}-prod-cluster"
local secondary_lb_ip=$(kubectl get service api-service -n production -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
if [ -z "$secondary_lb_ip" ]; then
# Try hostname for AWS ELB
secondary_lb_ip=$(kubectl get service api-service -n production -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')
fi
if [ -z "$secondary_lb_ip" ]; then
log_error "Unable to get secondary cloud load balancer IP/hostname"
return 1
fi
log_debug "Secondary load balancer: $secondary_lb_ip"
# Update DNS records (example using Route53)
case "$secondary" in
"aws")
# Update Route53 record
aws route53 change-resource-record-sets --hosted-zone-id Z123456789 --change-batch '{
"Changes": [{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": "api.company.com",
"Type": "A",
"TTL": 60,
"ResourceRecords": [{"Value": "'"$secondary_lb_ip"'"}]
}
}]
}'
;;
"azure")
# Update Azure DNS
az network dns record-set a add-record --resource-group dns-rg --zone-name company.com --record-set-name api --ipv4-address "$secondary_lb_ip"
;;
"gcp")
# Update Cloud DNS
gcloud dns record-sets transaction start --zone=company-com
gcloud dns record-sets transaction add --zone=company-com --name=api.company.com. --ttl=60 --type=A "$secondary_lb_ip"
gcloud dns record-sets transaction execute --zone=company-com
;;
esac
log_info "DNS updated to point to secondary cloud"
}Verify secondary cloud operation
verify_secondary_operation() {
local secondary="$1"
log_info "Verifying secondary cloud operation..."
kubectl config use-context "${secondary}-prod-cluster"
# Get service endpoint
local service_ip=$(kubectl get service api-service -n production -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
if [ -z "$service_ip" ]; then
service_ip=$(kubectl get service api-service -n production -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')
fi
if [ -z "$service_ip" ]; then
log_error "Unable to get service endpoint for verification"
return 1
fi
# Health check
log_debug "Testing health endpoint..."
if ! curl -f -s "http://$service_ip/health" >/dev/null; then
log_error "Health check failed"
return 1
fi
# API functionality test
log_debug "Testing API functionality..."
local api_response=$(curl -s "http://$service_ip/api/status")
if ! echo "$api_response" | jq -e '.status == "healthy"' >/dev/null 2>&1; then
log_error "API functionality test failed"
return 1
fi
# Database connectivity test
log_debug "Testing database connectivity..."
if ! kubectl exec -n production deploy/api-server -- /app/health-check --database >/dev/null 2>&1; then
log_error "Database connectivity test failed"
return 1
fi
log_info "✅ Secondary cloud operation verified"
return 0
}Send failover notifications
send_failover_notification() {
local status="$1"
local primary="$2"
local secondary="$3"
local duration="$4"
local reason="$5"
local webhook_url="${SLACK_WEBHOOK_URL:-}"
if [ -z "$webhook_url" ]; then
log_warn "No notification webhook configured"
return 0
fi
local color="good"
local emoji="✅"
if [ "$status" = "failed" ]; then
color="danger"
emoji="❌"
fi
local message="{
"attachments": [{
"color": "$color",
"title": "$emoji Disaster Recovery Failover - $status",
"fields": [
{"title": "Primary Cloud", "value": "$primary", "short": true},
{"title": "Secondary Cloud", "value": "$secondary", "short": true},
{"title": "Duration", "value": "$duration minutes", "short": true},
{"title": "Reason", "value": "$reason", "short": true},
{"title": "Timestamp", "value": "$(date -u)", "short": false}
]
}]
}"
curl -X POST -H 'Content-type: application/json' \
--data "$message" \
"$webhook_url"
}Automated monitoring and triggering
monitor_and_trigger() {
log_info "Starting automated DR monitoring..."
while true; do
if ! check_primary_health "$PRIMARY_CLOUD"; then
log_warn "Primary cloud health check failed"
# Wait and check again to avoid false positives
sleep 30
if ! check_primary_health "$PRIMARY_CLOUD"; then
log_error "Primary cloud confirmed unhealthy - triggering failover"
initiate_failover "$PRIMARY_CLOUD" "$SECONDARY_CLOUD" "automated_health_check_failure"
break
else
log_info "Primary cloud recovered - false alarm"
fi
else
log_debug "Primary cloud healthy"
fi
sleep 60 # Check every minute
done
}Failback to primary cloud
initiate_failback() {
local current_primary="$1"
local original_primary="$2"
log_info "🔄 INITIATING FAILBACK TO ORIGINAL PRIMARY"
log_info "Current Primary: $current_primary -> Original Primary: $original_primary"
# Verify original primary is healthy
if ! check_primary_health "$original_primary"; then
log_error "Original primary cloud is not healthy - cannot failback"
return 1
fi
# Sync data back to original primary
sync_data_to_secondary "$current_primary" "$original_primary"
# Activate original primary
activate_secondary_cloud "$original_primary"
# Update DNS back to original primary
update_dns_to_secondary "$original_primary"
# Verify operation
if verify_secondary_operation "$original_primary"; then
log_info "✅ Failback to original primary completed successfully"
# Scale down the secondary (former primary)
kubectl config use-context "${current_primary}-prod-cluster"
kubectl scale deployment api-server --replicas=1 -n production
kubectl scale deployment worker --replicas=1 -n production
kubectl scale deployment frontend --replicas=1 -n production
return 0
else
log_error "❌ Failback verification failed"
return 1
fi
}Test disaster recovery plan
test_dr_plan() {
log_info "🧪 TESTING DISASTER RECOVERY PLAN"
# Create test namespace
kubectl create namespace dr-test --dry-run=client -o yaml | kubectl apply -f -
# Deploy test application to secondary cloud
kubectl config use-context "${SECONDARY_CLOUD}-prod-cluster"
# Apply test deployment
cat </dev/null; then
log_info "✅ DR test successful - secondary cloud is operational"
else
log_error "❌ DR test failed - secondary cloud has issues"
fi
# Cleanup test resources
kubectl delete namespace dr-test
log_info "DR plan test completed"
}Main function
main() {
case "${1:-}" in
"monitor")
monitor_and_trigger
;;
"failover")
initiate_failover "${2:-$PRIMARY_CLOUD}" "${3:-$SECONDARY_CLOUD}" "${4:-manual_trigger}"
;;
"failback")
initiate_failback "${2:-$SECONDARY_CLOUD}" "${3:-$PRIMARY_CLOUD}"
;;
"test")
test_dr_plan
;;
"health-check")
check_primary_health "${2:-$PRIMARY_CLOUD}"
;;
*)
echo "Usage: $0 {monitor|failover|failback|test|health-check} [args...]"
echo ""
echo "Commands:"
echo " monitor - Start automated DR monitoring"
echo " failover [primary] [secondary] [reason] - Initiate disaster recovery failover"
echo " failback [current] [original] - Failback to original primary cloud"
echo " test - Test disaster recovery plan"
echo " health-check [cloud] - Check health of specified cloud"
echo ""
echo "Environment Variables:"
echo " PRIMARY_CLOUD - Primary cloud provider (default: aws)"
echo " SECONDARY_CLOUD - Secondary cloud provider (default: azure)"
echo " RTO_MINUTES - Recovery Time Objective (default: 15)"
echo " RPO_MINUTES - Recovery Point Objective (default: 5)"
echo " SLACK_WEBHOOK_URL - Slack webhook for notifications"
exit 1
;;
esac
}main "$@"
Conclusion
Building a successful multi-cloud strategy isn't about avoiding all cloud-specific services – it's about making informed architectural decisions that preserve your strategic flexibility while leveraging each cloud's strengths.
Key principles for multi-cloud success:
1. Use Kubernetes as Your Abstraction Layer: It provides the perfect balance of portability and cloud integration 2. Design for Operational Consistency: Unified monitoring, logging, and deployment processes across clouds 3. Implement Strategic Data Management: Know where your data lives and how to move it when needed 4. Plan for Disaster Recovery: Test your cross-cloud failover capabilities regularly 5. Optimize Costs Continuously: Leverage each cloud's pricing advantages for different workloads
The architecture patterns and tooling shown in this guide provide the foundation for a robust multi-cloud strategy that gives you:
- Vendor Independence: Never be locked into a single provider's ecosystem - Risk Mitigation: Distribute your infrastructure risk across multiple providers - Cost Optimization: Use the most cost-effective cloud for each workload - Performance Optimization: Leverage regional advantages and specialized services - Negotiating Power: Maintain leverage in vendor relationships
Remember: The goal isn't to build identical systems everywhere, but to architect for strategic optionality while delivering maximum business value.
---
Need help implementing a multi-cloud strategy for your organization? Contact our cloud architects for guidance on Kubernetes-based multi-cloud design, vendor selection, and migration planning.
Tags: