CI/CD Pipeline Security: Integrating DevSecOps from Day One
Jules Musoko
Principal Consultant
CI/CD Pipeline Security: Integrating DevSecOps from Day One
In today's threat landscape, security can't be an afterthought. Organizations need to embed security practices directly into their CI/CD pipelines from the very beginning. After securing over 200+ enterprise deployments and implementing DevSecOps practices for major financial institutions, I've learned that the most effective security strategies are those built into the development workflow itself.
This comprehensive guide will show you how to implement enterprise-grade DevSecOps practices that provide continuous security validation without slowing down your development velocity.
The Security Challenge in Modern Development
Traditional security approaches create bottlenecks:
- Security reviews at the end slow down releases and catch issues late - Manual security processes don't scale with rapid deployment cycles - Disconnected security teams create friction with development workflows - Inconsistent security practices across different projects and teams
The solution is shifting security "left" – integrating security practices throughout the entire development lifecycle.
DevSecOps Pipeline Architecture
Here's the comprehensive security architecture we implement for enterprise clients:
.gitlab-ci.yml - Complete DevSecOps pipeline
stages:
- security-scan
- build
- security-test
- deploy-staging
- security-validate
- deploy-production
- monitorvariables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: "/certs"
# Security scanning tools
SONARQUBE_URL: "https://sonarqube.company.com"
SNYK_TOKEN: "$SNYK_TOKEN"
CHECKMARX_URL: "https://checkmarx.company.com"
Pre-commit security checks
security-pre-commit:
stage: security-scan
image: alpine:latest
before_script:
- apk add --no-cache git python3 py3-pip
- pip3 install pre-commit
script:
- pre-commit run --all-files
- echo "Pre-commit security checks passed"
artifacts:
reports:
junit: pre-commit-results.xml
only:
- merge_requests
- master
- developStatic Application Security Testing (SAST)
sast-scan:
stage: security-scan
image: sonarsource/sonar-scanner-cli:latest
variables:
SONAR_USER_HOME: "${CI_PROJECT_DIR}/.sonar"
GIT_DEPTH: "0"
cache:
key: "${CI_JOB_NAME}"
paths:
- .sonar/cache
script:
- sonar-scanner
-Dsonar.projectKey=${CI_PROJECT_NAME}
-Dsonar.sources=.
-Dsonar.host.url=${SONARQUBE_URL}
-Dsonar.login=${SONARQUBE_TOKEN}
-Dsonar.qualitygate.wait=true
artifacts:
reports:
junit: sonar-report.xml
allow_failure: falseSoftware Composition Analysis (SCA)
dependency-scan:
stage: security-scan
image: snyk/snyk:node
script:
- snyk auth $SNYK_TOKEN
- snyk test --json > snyk-results.json || true
- snyk monitor --project-name=${CI_PROJECT_NAME}
# Convert Snyk results to GitLab format
- snyk-to-html -i snyk-results.json -o snyk-report.html
artifacts:
reports:
dependency_scanning: snyk-results.json
paths:
- snyk-report.html
expire_in: 1 weekSecret detection
secret-detection:
stage: security-scan
image: alpine:git
before_script:
- apk add --no-cache git
- git clone https://github.com/trufflesecurity/trufflehog.git
- cd trufflehog && go build
script:
- ./trufflehog/trufflehog --regex --entropy=False ${CI_PROJECT_DIR}
- echo "No secrets detected in repository"
allow_failure: falseBuild with security hardening
build-secure:
stage: build
image: docker:20.10.16
services:
- docker:20.10.16-dind
before_script:
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
script:
# Multi-stage build with security scanning
- docker build
--target production
--build-arg BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ')
--build-arg VCS_REF=$CI_COMMIT_SHA
--build-arg VERSION=$CI_COMMIT_TAG
-t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
# Container security scanning with Trivy
- docker run --rm -v /var/run/docker.sock:/var/run/docker.sock
-v $PWD:/tmp aquasec/trivy:latest image
--format json --output /tmp/trivy-report.json
$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
# Fail build if HIGH or CRITICAL vulnerabilities found
- docker run --rm -v $PWD:/tmp aquasec/trivy:latest image
--exit-code 1 --severity HIGH,CRITICAL
$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
artifacts:
reports:
container_scanning: trivy-report.json
expire_in: 1 weekDynamic Application Security Testing (DAST)
dast-scan:
stage: security-test
image: owasp/zap2docker-stable:latest
services:
- name: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
alias: testapp
script:
- mkdir -p /zap/wrk
# Wait for application to be ready
- timeout 300 sh -c 'until wget -q --spider http://testapp:8080/health; do sleep 5; done'
# Run ZAP baseline scan
- /zap/zap-baseline.py
-t http://testapp:8080
-g gen.conf
-J zap-report.json
-r zap-report.html
# Run full scan for critical applications
- if [ "$SECURITY_LEVEL" = "critical" ]; then
/zap/zap-full-scan.py -t http://testapp:8080 -J zap-full-report.json;
fi
artifacts:
reports:
dast: zap-report.json
paths:
- zap-report.html
expire_in: 1 week
allow_failure: falseInfrastructure as Code security scanning
iac-security-scan:
stage: security-test
image: bridgecrew/checkov:latest
script:
# Scan Terraform files
- checkov -d . --framework terraform --output json --output-file checkov-terraform.json
# Scan Kubernetes manifests
- checkov -d ./k8s --framework kubernetes --output json --output-file checkov-k8s.json
# Scan Dockerfiles
- checkov -f Dockerfile --framework dockerfile --output json --output-file checkov-docker.json
# Combine reports
- cat checkov-*.json > combined-iac-report.json
artifacts:
reports:
sast: combined-iac-report.json
expire_in: 1 week
allow_failure: false
Secure Container Build Process
Security starts with the container image. Here's our production-hardened Dockerfile:
Multi-stage build with security hardening
ARG GOLANG_VERSION=1.21-alpine3.19
ARG ALPINE_VERSION=3.19Build stage with specific user
FROM golang:${GOLANG_VERSION} AS builderSecurity: Create non-root user for build
RUN adduser -D -s /bin/sh -u 1001 builduserSecurity: Install only necessary packages and clean up
RUN apk add --no-cache ca-certificates git tzdata && \
apk upgrade --no-cacheWORKDIR /build
COPY --chown=builduser:builduser go.mod go.sum ./
Security: Run as non-root user
USER builduser
RUN go mod download && go mod verifyCOPY --chown=builduser:builduser . .
Security: Build with security flags
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags='-w -s -extldflags "-static"' \
-a -installsuffix cgo \
-o app ./cmd/serverProduction stage - minimal attack surface
FROM alpine:${ALPINE_VERSION}Security: Add metadata for compliance
LABEL maintainer="devops@company.com" \
version="1.0" \
description="Production API server" \
security.scan.date="${BUILD_DATE}" \
org.opencontainers.image.source="https://github.com/company/api-server"Security: Install security updates only
RUN apk update && \
apk upgrade && \
apk add --no-cache ca-certificates tzdata && \
rm -rf /var/cache/apk/*Security: Create non-privileged user
RUN addgroup -g 1001 appgroup && \
adduser -D -s /sbin/nologin -u 1001 -G appgroup appuserSecurity: Set up secure directories
RUN mkdir -p /app/logs /app/config && \
chown -R appuser:appgroup /app && \
chmod 750 /appWORKDIR /app
Security: Copy files with proper ownership
COPY --from=builder --chown=appuser:appgroup /build/app ./
COPY --from=builder --chown=appuser:appgroup /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/Security: Drop privileges
USER appuserSecurity: Use specific port
EXPOSE 8080Security: Use exec form and specific user
ENTRYPOINT ["./app"]Health check for security monitoring
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1
Kubernetes Security Configuration
Secure deployment manifests with comprehensive security policies:
deployment-secure.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-server
namespace: production
labels:
app: api-server
version: v1.2.3
security.policy: strict
spec:
replicas: 3
selector:
matchLabels:
app: api-server
template:
metadata:
labels:
app: api-server
version: v1.2.3
annotations:
# Security annotations for monitoring
security.alpha.kubernetes.io/sysctls: net.core.somaxconn=1024
container.apparmor.security.beta.kubernetes.io/api-server: runtime/default
spec:
# Security: Service account with minimal permissions
serviceAccountName: api-server-sa
automountServiceAccountToken: false
# Security: Pod security context
securityContext:
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
seccompProfile:
type: RuntimeDefault
supplementalGroups: [1001]
containers:
- name: api-server
image: registry.company.com/api-server:v1.2.3
imagePullPolicy: Always
ports:
- name: http
containerPort: 8080
protocol: TCP
# Security: Container security context
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
capabilities:
drop:
- ALL
add: []
seccompProfile:
type: RuntimeDefault
# Resource limits for security
resources:
limits:
cpu: 500m
memory: 512Mi
ephemeral-storage: 1Gi
requests:
cpu: 200m
memory: 256Mi
ephemeral-storage: 100Mi
# Health checks
livenessProbe:
httpGet:
path: /health
port: http
scheme: HTTP
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: http
scheme: HTTP
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
failureThreshold: 3
# Environment variables from secrets
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: api-server-secrets
key: database-url
- name: API_KEY
valueFrom:
secretKeyRef:
name: api-server-secrets
key: api-key
# Secure volume mounts
volumeMounts:
- name: tmp-volume
mountPath: /tmp
- name: logs-volume
mountPath: /app/logs
- name: config-volume
mountPath: /app/config
readOnly: true
# Security: Volumes configuration
volumes:
- name: tmp-volume
emptyDir:
sizeLimit: 100Mi
- name: logs-volume
emptyDir:
sizeLimit: 500Mi
- name: config-volume
configMap:
name: api-server-config
defaultMode: 0644
# Security: Node selection and affinity
nodeSelector:
security-zone: restricted
# Security: Pod anti-affinity for high availability
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- api-server
topologyKey: kubernetes.io/hostname---
Network policy for security isolation
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: api-server-netpol
namespace: production
spec:
podSelector:
matchLabels:
app: api-server
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
- podSelector:
matchLabels:
app: nginx-ingress
ports:
- protocol: TCP
port: 8080
egress:
- to:
- namespaceSelector:
matchLabels:
name: database
ports:
- protocol: TCP
port: 5432
- to: []
ports:
- protocol: TCP
port: 443 # HTTPS outbound
- protocol: TCP
port: 53 # DNS
- protocol: UDP
port: 53 # DNS---
Pod Security Policy (or Pod Security Standards)
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: api-server-psp
spec:
privileged: false
allowPrivilegeEscalation: false
requiredDropCapabilities:
- ALL
volumes:
- 'configMap'
- 'emptyDir'
- 'projected'
- 'secret'
- 'downwardAPI'
- 'persistentVolumeClaim'
runAsUser:
rule: 'MustRunAsNonRoot'
runAsGroup:
rule: 'MustRunAs'
ranges:
- min: 1001
max: 1001
seLinux:
rule: 'RunAsAny'
fsGroup:
rule: 'MustRunAs'
ranges:
- min: 1001
max: 1001
readOnlyRootFilesystem: true
Automated Security Testing
Comprehensive automated security testing integrated into the pipeline:
#!/usr/bin/env python3
security-test-suite.py
import requests
import json
import subprocess
import time
import sys
from typing import Dict, List, Tuple
class SecurityTestSuite:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'SecurityTestSuite/1.0',
'Accept': 'application/json'
})
def run_all_tests(self) -> Dict:
"""Run comprehensive security test suite"""
results = {
'timestamp': time.time(),
'target': self.base_url,
'tests': {}
}
test_suites = [
('authentication', self.test_authentication_security),
('authorization', self.test_authorization_controls),
('input_validation', self.test_input_validation),
('ssl_tls', self.test_ssl_configuration),
('headers', self.test_security_headers),
('rate_limiting', self.test_rate_limiting),
('injection', self.test_injection_vulnerabilities),
('session_management', self.test_session_security)
]
for test_name, test_func in test_suites:
print(f"Running {test_name} tests...")
try:
results['tests'][test_name] = test_func()
print(f"✓ {test_name} tests completed")
except Exception as e:
print(f"✗ {test_name} tests failed: {e}")
results['tests'][test_name] = {'error': str(e), 'passed': False}
return results
def test_authentication_security(self) -> Dict:
"""Test authentication mechanisms"""
tests = {}
# Test 1: No credentials should return 401
response = self.session.get(f"{self.base_url}/api/protected")
tests['no_auth_401'] = {
'passed': response.status_code == 401,
'expected': 401,
'actual': response.status_code
}
# Test 2: Invalid credentials should return 401
invalid_headers = {'Authorization': 'Bearer invalid_token'}
response = self.session.get(f"{self.base_url}/api/protected", headers=invalid_headers)
tests['invalid_auth_401'] = {
'passed': response.status_code == 401,
'expected': 401,
'actual': response.status_code
}
# Test 3: Valid credentials should work
valid_headers = {'Authorization': f'Bearer {self.api_key}'}
response = self.session.get(f"{self.base_url}/api/protected", headers=valid_headers)
tests['valid_auth_200'] = {
'passed': response.status_code == 200,
'expected': 200,
'actual': response.status_code
}
# Test 4: JWT token structure (if using JWT)
if self.api_key.count('.') == 2: # JWT format
tests['jwt_structure'] = self.validate_jwt_structure(self.api_key)
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def test_authorization_controls(self) -> Dict:
"""Test authorization and access controls"""
tests = {}
# Test role-based access control
endpoints_to_test = [
('/api/admin/users', 403, 'admin_endpoint_forbidden'),
('/api/user/profile', 200, 'user_endpoint_allowed'),
('/api/public/health', 200, 'public_endpoint_allowed')
]
headers = {'Authorization': f'Bearer {self.api_key}'}
for endpoint, expected_status, test_name in endpoints_to_test:
response = self.session.get(f"{self.base_url}{endpoint}", headers=headers)
tests[test_name] = {
'passed': response.status_code == expected_status,
'expected': expected_status,
'actual': response.status_code,
'endpoint': endpoint
}
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def test_input_validation(self) -> Dict:
"""Test input validation and sanitization"""
tests = {}
headers = {'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json'}
# Test SQL injection attempts
sql_payloads = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"admin'/*",
"' UNION SELECT * FROM users --"
]
for i, payload in enumerate(sql_payloads):
test_data = {'username': payload, 'email': 'test@example.com'}
response = self.session.post(f"{self.base_url}/api/users",
json=test_data, headers=headers)
# Should reject malicious input (400 or 422)
tests[f'sql_injection_{i+1}'] = {
'passed': response.status_code in [400, 422],
'payload': payload,
'status_code': response.status_code
}
# Test XSS payloads
xss_payloads = [
"",
"javascript:alert('xss')",
"
",
"';alert('xss');//"
]
for i, payload in enumerate(xss_payloads):
test_data = {'comment': payload}
response = self.session.post(f"{self.base_url}/api/comments",
json=test_data, headers=headers)
tests[f'xss_injection_{i+1}'] = {
'passed': response.status_code in [400, 422],
'payload': payload,
'status_code': response.status_code
}
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def test_ssl_configuration(self) -> Dict:
"""Test SSL/TLS configuration"""
tests = {}
try:
# Test SSL certificate
response = self.session.get(self.base_url, verify=True)
tests['valid_ssl_cert'] = {
'passed': True,
'message': 'SSL certificate is valid'
}
except requests.exceptions.SSLError as e:
tests['valid_ssl_cert'] = {
'passed': False,
'error': str(e)
}
# Test HTTP to HTTPS redirect
if self.base_url.startswith('https://'):
http_url = self.base_url.replace('https://', 'http://')
try:
response = self.session.get(http_url, allow_redirects=False)
tests['https_redirect'] = {
'passed': response.status_code in [301, 302, 307, 308],
'status_code': response.status_code
}
except Exception as e:
tests['https_redirect'] = {
'passed': False,
'error': str(e)
}
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def test_security_headers(self) -> Dict:
"""Test security headers"""
response = self.session.get(self.base_url)
headers = response.headers
required_headers = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': ['DENY', 'SAMEORIGIN'],
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': None, # Should exist
'Content-Security-Policy': None, # Should exist
'Referrer-Policy': None # Should exist
}
tests = {}
for header, expected_value in required_headers.items():
header_present = header in headers
tests[f'{header.lower().replace("-", "_")}_present'] = {
'passed': header_present,
'expected': f'{header} header present',
'actual': f'{header} header {"present" if header_present else "missing"}'
}
if header_present and expected_value:
if isinstance(expected_value, list):
tests[f'{header.lower().replace("-", "_")}_value'] = {
'passed': headers[header] in expected_value,
'expected': f'One of: {expected_value}',
'actual': headers[header]
}
else:
tests[f'{header.lower().replace("-", "_")}_value'] = {
'passed': headers[header] == expected_value,
'expected': expected_value,
'actual': headers[header]
}
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def test_rate_limiting(self) -> Dict:
"""Test rate limiting implementation"""
tests = {}
# Make rapid requests to test rate limiting
rapid_requests = []
for i in range(20): # Make 20 rapid requests
try:
response = self.session.get(f"{self.base_url}/api/public/health")
rapid_requests.append(response.status_code)
time.sleep(0.1) # Small delay between requests
except Exception as e:
rapid_requests.append(f"Error: {e}")
# Check if rate limiting kicked in (expect some 429 responses)
rate_limited_responses = [code for code in rapid_requests if code == 429]
tests['rate_limiting_active'] = {
'passed': len(rate_limited_responses) > 0,
'total_requests': len(rapid_requests),
'rate_limited_count': len(rate_limited_responses),
'message': 'Rate limiting should trigger after rapid requests'
}
return {
'passed': all(test.get('passed', False) for test in tests.values()),
'tests': tests
}
def validate_jwt_structure(self, token: str) -> Dict:
"""Validate JWT token structure and claims"""
try:
import base64
# Decode JWT header and payload (without verification for testing)
header, payload, signature = token.split('.')
# Add padding if needed
header += '=' * (4 - len(header) % 4)
payload += '=' * (4 - len(payload) % 4)
header_data = json.loads(base64.b64decode(header))
payload_data = json.loads(base64.b64decode(payload))
# Check required JWT claims
required_claims = ['iat', 'exp', 'sub']
missing_claims = [claim for claim in required_claims if claim not in payload_data]
# Check token expiration
current_time = time.time()
token_expired = payload_data.get('exp', 0) < current_time
return {
'passed': len(missing_claims) == 0 and not token_expired,
'header': header_data,
'payload_claims': list(payload_data.keys()),
'missing_claims': missing_claims,
'expired': token_expired
}
except Exception as e:
return {
'passed': False,
'error': f"JWT validation failed: {e}"
}
def generate_report(self, results: Dict) -> str:
"""Generate security test report"""
report = []
report.append("# Security Test Report")
report.append(f"Target: {results['target']}")
report.append(f"Timestamp: {time.ctime(results['timestamp'])}")
report.append("")
total_tests = 0
passed_tests = 0
for test_suite, suite_results in results['tests'].items():
report.append(f"## {test_suite.replace('_', ' ').title()}")
suite_passed = suite_results.get('passed', False)
status_icon = "✅" if suite_passed else "❌"
report.append(f"{status_icon} Overall: {'PASSED' if suite_passed else 'FAILED'}")
report.append("")
if 'tests' in suite_results:
for test_name, test_result in suite_results['tests'].items():
test_passed = test_result.get('passed', False)
test_icon = "✅" if test_passed else "❌"
report.append(f" {test_icon} {test_name}: {'PASS' if test_passed else 'FAIL'}")
total_tests += 1
if test_passed:
passed_tests += 1
report.append("")
# Summary
pass_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
report.insert(2, f"Pass Rate: {pass_rate}% ({passed_tests}/{total_tests})")
report.insert(3, "")
return "\n".join(report)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python security-test-suite.py ")
sys.exit(1)
base_url = sys.argv[1]
api_key = sys.argv[2]
suite = SecurityTestSuite(base_url, api_key)
results = suite.run_all_tests()
# Generate and save report
report = suite.generate_report(results)
with open('security-test-report.md', 'w') as f:
f.write(report)
# Save raw results
with open('security-test-results.json', 'w') as f:
json.dump(results, f, indent=2)
# Exit with appropriate code
all_passed = all(test.get('passed', False) for test in results['tests'].values())
print(f"\nSecurity tests {'PASSED' if all_passed else 'FAILED'}")
print(f"Report saved to: security-test-report.md")
sys.exit(0 if all_passed else 1)
Compliance and Monitoring
Continuous compliance monitoring with automated reporting:
#!/usr/bin/env python3
compliance-monitor.py
import json
import boto3
import requests
from datetime import datetime, timedelta
from typing import Dict, List
import logging
class ComplianceMonitor:
def __init__(self, config: Dict):
self.config = config
self.aws_client = boto3.client('securityhub')
self.logger = logging.getLogger(__name__)
def check_security_compliance(self) -> Dict:
"""Check overall security compliance"""
compliance_checks = {
'cis_benchmarks': self.check_cis_compliance(),
'owasp_asvs': self.check_owasp_compliance(),
'pci_dss': self.check_pci_compliance(),
'gdpr_privacy': self.check_gdpr_compliance(),
'vulnerability_management': self.check_vulnerability_status()
}
# Calculate overall compliance score
total_checks = sum(len(checks['results']) for checks in compliance_checks.values())
passed_checks = sum(
sum(1 for result in checks['results'] if result.get('compliant', False))
for checks in compliance_checks.values()
)
overall_score = (passed_checks / total_checks * 100) if total_checks > 0 else 0
return {
'timestamp': datetime.utcnow().isoformat(),
'overall_compliance_score': overall_score,
'compliance_checks': compliance_checks,
'recommendations': self.generate_recommendations(compliance_checks)
}
def check_cis_compliance(self) -> Dict:
"""Check CIS (Center for Internet Security) benchmark compliance"""
results = []
# CIS Control 1: Inventory and Control of Hardware Assets
results.append({
'control': 'CIS-1',
'description': 'Inventory and Control of Hardware Assets',
'compliant': self.verify_asset_inventory(),
'evidence': 'Asset inventory updated in last 24 hours'
})
# CIS Control 2: Inventory and Control of Software Assets
results.append({
'control': 'CIS-2',
'description': 'Inventory and Control of Software Assets',
'compliant': self.verify_software_inventory(),
'evidence': 'Container images scanned and catalogued'
})
# CIS Control 3: Continuous Vulnerability Management
results.append({
'control': 'CIS-3',
'description': 'Continuous Vulnerability Management',
'compliant': self.verify_vulnerability_scanning(),
'evidence': 'Daily vulnerability scans with <24h remediation SLA'
})
# CIS Control 6: Maintenance, Monitoring, and Analysis of Audit Logs
results.append({
'control': 'CIS-6',
'description': 'Audit Log Management',
'compliant': self.verify_audit_logging(),
'evidence': 'Centralized logging with 90-day retention'
})
return {
'framework': 'CIS Controls v8',
'results': results,
'compliance_rate': sum(1 for r in results if r['compliant']) / len(results) * 100
}
def check_owasp_compliance(self) -> Dict:
"""Check OWASP ASVS (Application Security Verification Standard) compliance"""
results = []
# V1: Architecture, Design and Threat Modeling
results.append({
'control': 'ASVS-V1',
'description': 'Architecture and Design',
'compliant': self.verify_secure_architecture(),
'evidence': 'Threat model documented and reviewed'
})
# V2: Authentication
results.append({
'control': 'ASVS-V2',
'description': 'Authentication Verification',
'compliant': self.verify_authentication_controls(),
'evidence': 'Multi-factor authentication enforced'
})
# V3: Session Management
results.append({
'control': 'ASVS-V3',
'description': 'Session Management',
'compliant': self.verify_session_management(),
'evidence': 'Secure session handling implemented'
})
# V4: Access Control
results.append({
'control': 'ASVS-V4',
'description': 'Access Control Verification',
'compliant': self.verify_access_controls(),
'evidence': 'Role-based access control implemented'
})
# V7: Error Handling and Logging
results.append({
'control': 'ASVS-V7',
'description': 'Error Handling and Logging',
'compliant': self.verify_error_handling(),
'evidence': 'Secure error handling without information disclosure'
})
return {
'framework': 'OWASP ASVS v4.0',
'results': results,
'compliance_rate': sum(1 for r in results if r['compliant']) / len(results) * 100
}
def verify_asset_inventory(self) -> bool:
"""Verify asset inventory is current"""
try:
# Check if asset inventory was updated recently
last_update = self.get_last_inventory_update()
return (datetime.utcnow() - last_update).days < 1
except Exception:
return False
def verify_vulnerability_scanning(self) -> bool:
"""Verify continuous vulnerability scanning"""
try:
# Check latest scan results from Security Hub
response = self.aws_client.get_findings(
Filters={
'ProductName': [{'Value': 'Inspector', 'Comparison': 'EQUALS'}],
'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}],
'CreatedAt': [{
'Start': (datetime.utcnow() - timedelta(days=1)).isoformat(),
'End': datetime.utcnow().isoformat()
}]
}
)
return len(response.get('Findings', [])) > 0
except Exception:
return False
def generate_security_report(self, compliance_data: Dict) -> str:
"""Generate comprehensive security report"""
report = []
report.append("# Security Compliance Report")
report.append(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
report.append(f"Overall Compliance Score: {compliance_data['overall_compliance_score']}%")
report.append("")
# Executive Summary
if compliance_data['overall_compliance_score'] >= 90:
status = "🟢 EXCELLENT"
summary = "Security posture is excellent with minimal risks identified."
elif compliance_data['overall_compliance_score'] >= 75:
status = "🟡 GOOD"
summary = "Security posture is good with some areas for improvement."
else:
status = "🔴 NEEDS ATTENTION"
summary = "Security posture needs immediate attention with multiple compliance gaps."
report.append(f"## Executive Summary")
report.append(f"Status: {status}")
report.append(f"{summary}")
report.append("")
# Detailed Results
for framework, results in compliance_data['compliance_checks'].items():
report.append(f"## {framework.replace('_', ' ').title()}")
report.append(f"Compliance Rate: {results['compliance_rate']}%")
report.append("")
for result in results['results']:
status_icon = "✅" if result['compliant'] else "❌"
report.append(f"{status_icon} {result['control']}: {result['description']}")
if result.get('evidence'):
report.append(f" Evidence: {result['evidence']}")
report.append("")
# Recommendations
if compliance_data.get('recommendations'):
report.append("## Recommendations")
for i, rec in enumerate(compliance_data['recommendations'], 1):
report.append(f"{i}. {rec['priority']}: {rec['description']}")
if rec.get('implementation'):
report.append(f" Implementation: {rec['implementation']}")
report.append("")
return "\n".join(report)
Example usage in CI/CD pipeline
if __name__ == "__main__":
config = {
'aws_region': 'us-east-1',
'notification_webhook': 'https://hooks.slack.com/services/...',
'compliance_threshold': 85.0
}
monitor = ComplianceMonitor(config)
compliance_data = monitor.check_security_compliance()
# Generate report
report = monitor.generate_security_report(compliance_data)
# Save report
with open('security-compliance-report.md', 'w') as f:
f.write(report)
# Send notification if compliance below threshold
if compliance_data['overall_compliance_score'] < config['compliance_threshold']:
# Send alert to security team
print("⚠️ Security compliance below threshold - alert sent")
print(f"Compliance Score: {compliance_data['overall_compliance_score']}%")
Security Monitoring and Alerting
Real-time security monitoring with automated response:
security-monitoring.yaml - Prometheus + Grafana monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: security-alerts
namespace: monitoring
spec:
groups:
- name: security-high-priority
interval: 30s
rules:
# Authentication failures
- alert: HighAuthenticationFailures
expr: sum(rate(http_requests_total{status=~"401|403"}[5m])) > 10
for: 2m
labels:
severity: critical
category: security
annotations:
summary: "High number of authentication failures detected"
description: "More than 10 authentication failures per second in the last 5 minutes"
runbook_url: "https://wiki.company.com/security/auth-failures"
# Potential DDoS attack
- alert: PotentialDDoSAttack
expr: sum(rate(http_requests_total[1m])) > 1000
for: 1m
labels:
severity: critical
category: security
annotations:
summary: "Potential DDoS attack detected"
description: "Request rate exceeds 1000 requests per second"
runbook_url: "https://wiki.company.com/security/ddos-response"
# High error rate (potential attack or system compromise)
- alert: HighErrorRate
expr: sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.1
for: 5m
labels:
severity: warning
category: security
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes"
# SQL injection attempts
- alert: SQLInjectionAttempts
expr: sum(rate(security_events_total{type="sql_injection"}[5m])) > 0
for: 0s
labels:
severity: critical
category: security
annotations:
summary: "SQL injection attempts detected"
description: "SQL injection patterns detected in application logs"
runbook_url: "https://wiki.company.com/security/sql-injection-response"
# Container security violations
- alert: ContainerSecurityViolation
expr: sum(rate(falco_events_total{rule_type="security"}[5m])) > 0
for: 0s
labels:
severity: high
category: security
annotations:
summary: "Container security violation detected"
description: "Falco detected security policy violation in containers"
# Certificate expiration
- alert: CertificateExpiringSoon
expr: (ssl_certificate_expiry_seconds - time()) / 86400 < 30
for: 1h
labels:
severity: warning
category: security
annotations:
summary: "SSL certificate expiring soon"
description: "Certificate {{ $labels.instance }} expires in {{ $value }} days" - name: security-compliance
interval: 1h
rules:
# Vulnerability scan results
- alert: HighSeverityVulnerabilities
expr: sum(vulnerability_scanner{severity="HIGH"}) > 0
for: 0s
labels:
severity: high
category: compliance
annotations:
summary: "High severity vulnerabilities found"
description: "{{ $value }} high severity vulnerabilities detected"
# Compliance check failures
- alert: ComplianceCheckFailure
expr: compliance_check_success < 0.9
for: 15m
labels:
severity: warning
category: compliance
annotations:
summary: "Compliance checks failing"
description: "Compliance success rate is {{ $value | humanizePercentage }}"
Secure Secrets Management
Comprehensive secrets management strategy:
#!/bin/bash
secrets-management.sh - Secure secrets handling
set -euo pipefail
Configuration
VAULT_ADDR="${VAULT_ADDR:-https://vault.company.com}"
KUBERNETES_NAMESPACE="${KUBERNETES_NAMESPACE:-production}"
APP_NAME="${APP_NAME:-api-server}"Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Colorlog_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
Rotate application secrets
rotate_secrets() {
local secret_name="$1"
local vault_path="$2"
log_info "Rotating secrets for $secret_name"
# Generate new secrets
local new_database_password=$(openssl rand -base64 32)
local new_api_key=$(openssl rand -hex 32)
local new_jwt_secret=$(openssl rand -base64 64)
# Store in Vault
vault kv put "$vault_path" \
database_password="$new_database_password" \
api_key="$new_api_key" \
jwt_secret="$new_jwt_secret"
# Update Kubernetes secret
kubectl create secret generic "$secret_name" \
--namespace="$KUBERNETES_NAMESPACE" \
--from-literal=database-password="$new_database_password" \
--from-literal=api-key="$new_api_key" \
--from-literal=jwt-secret="$new_jwt_secret" \
--dry-run=client -o yaml | kubectl apply -f -
# Rolling restart of deployments using this secret
kubectl rollout restart deployment/$APP_NAME -n "$KUBERNETES_NAMESPACE"
log_info "Secret rotation completed for $secret_name"
}Scan for secrets in code
scan_for_secrets() {
log_info "Scanning for hardcoded secrets..."
# Use multiple tools for comprehensive scanning
local findings=0
# Trufflehog scan
if command -v trufflehog &> /dev/null; then
log_info "Running Trufflehog scan..."
if trufflehog --regex --entropy=False . > trufflehog-results.json; then
local trufflehog_findings=$(jq length trufflehog-results.json)
if [ "$trufflehog_findings" -gt 0 ]; then
log_error "Trufflehog found $trufflehog_findings potential secrets"
findings=$((findings + trufflehog_findings))
fi
fi
fi
# GitLeaks scan
if command -v gitleaks &> /dev/null; then
log_info "Running GitLeaks scan..."
if ! gitleaks detect --source . --report-path gitleaks-report.json; then
local gitleaks_findings=$(jq length gitleaks-report.json 2>/dev/null || echo "0")
if [ "$gitleaks_findings" -gt 0 ]; then
log_error "GitLeaks found $gitleaks_findings potential secrets"
findings=$((findings + gitleaks_findings))
fi
fi
fi
# Custom regex patterns
log_info "Running custom secret patterns scan..."
local secret_patterns=(
"password\s=\s["'][^"']{8,}["']"
"api[_-]?key\s=\s["'][^"']{16,}["']"
"secret\s=\s["'][^"']{16,}["']"
"token\s=\s["'][^"']{16,}["']"
"-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----"
)
for pattern in "${secret_patterns[@]}"; do
if grep -r -E -i "$pattern" . --exclude-dir=.git --exclude="*.md" --exclude="secrets-management.sh"; then
log_error "Found potential secret matching pattern: $pattern"
findings=$((findings + 1))
fi
done
if [ $findings -eq 0 ]; then
log_info "No secrets found in codebase ✓"
return 0
else
log_error "Found $findings potential secrets in codebase"
return 1
fi
}Validate secret strength
validate_secret_strength() {
local secret="$1"
local min_length=${2:-16}
# Check length
if [ ${#secret} -lt $min_length ]; then
log_error "Secret too short (minimum $min_length characters)"
return 1
fi
# Check complexity
if ! echo "$secret" | grep -q '[A-Z]'; then
log_warn "Secret should contain uppercase letters"
fi
if ! echo "$secret" | grep -q '[a-z]'; then
log_warn "Secret should contain lowercase letters"
fi
if ! echo "$secret" | grep -q '[0-9]'; then
log_warn "Secret should contain numbers"
fi
if ! echo "$secret" | grep -q '[^A-Za-z0-9]'; then
log_warn "Secret should contain special characters"
fi
log_info "Secret validation completed"
return 0
}Setup External Secrets Operator
setup_external_secrets() {
log_info "Setting up External Secrets Operator..."
# Install External Secrets Operator
helm repo add external-secrets https://charts.external-secrets.io
helm repo update
helm upgrade --install external-secrets external-secrets/external-secrets \
--namespace external-secrets-system \
--create-namespace \
--set installCRDs=true
# Create SecretStore for Vault integration
cat <Main function
main() {
case "${1:-}" in
"rotate")
rotate_secrets "${2:-$APP_NAME-secrets}" "${3:-secret/$APP_NAME}"
;;
"scan")
scan_for_secrets
;;
"validate")
validate_secret_strength "${2:-}" "${3:-16}"
;;
"setup-external-secrets")
setup_external_secrets
;;
*)
echo "Usage: $0 {rotate|scan|validate|setup-external-secrets} [args...]"
echo ""
echo "Commands:"
echo " rotate - Rotate application secrets"
echo " scan - Scan for hardcoded secrets"
echo " validate [min-length] - Validate secret strength"
echo " setup-external-secrets - Setup External Secrets Operator"
exit 1
;;
esac
}main "$@"
Conclusion
DevSecOps is not just about adding security tools to your pipeline – it's about creating a security-first culture where security is everyone's responsibility. The implementations shown in this guide provide:
- Automated Security Testing that catches issues before production - Secure-by-Default Infrastructure that reduces attack surface - Continuous Compliance Monitoring that ensures ongoing security posture - Incident Response Capabilities that enable rapid threat response
Key principles for successful DevSecOps implementation:
1. Start Early: Integrate security from the first line of code 2. Automate Everything: Manual security processes don't scale 3. Fail Fast: Catch and fix security issues as early as possible 4. Monitor Continuously: Security is an ongoing process, not a one-time check 5. Educate Teams: Every developer should understand security implications
The security landscape is constantly evolving, but with these foundational practices in place, your organization will be well-positioned to detect, respond to, and prevent security threats while maintaining development velocity.
---
Need help implementing DevSecOps in your organization? Contact our security experts for guidance on secure CI/CD pipeline design, compliance automation, and security culture transformation.
Tags: