DevSecOpsSecurityCI/CD

CI/CD Pipeline Security: Integrating DevSecOps from Day One

JM

Jules Musoko

Principal Consultant

32 min read

CI/CD Pipeline Security: Integrating DevSecOps from Day One

In today's threat landscape, security can't be an afterthought. Organizations need to embed security practices directly into their CI/CD pipelines from the very beginning. After securing over 200+ enterprise deployments and implementing DevSecOps practices for major financial institutions, I've learned that the most effective security strategies are those built into the development workflow itself.

This comprehensive guide will show you how to implement enterprise-grade DevSecOps practices that provide continuous security validation without slowing down your development velocity.

The Security Challenge in Modern Development

Traditional security approaches create bottlenecks:

- Security reviews at the end slow down releases and catch issues late - Manual security processes don't scale with rapid deployment cycles - Disconnected security teams create friction with development workflows - Inconsistent security practices across different projects and teams

The solution is shifting security "left" – integrating security practices throughout the entire development lifecycle.

DevSecOps Pipeline Architecture

Here's the comprehensive security architecture we implement for enterprise clients:

.gitlab-ci.yml - Complete DevSecOps pipeline

stages: - security-scan - build - security-test - deploy-staging - security-validate - deploy-production - monitor

variables: DOCKER_DRIVER: overlay2 DOCKER_TLS_CERTDIR: "/certs" # Security scanning tools SONARQUBE_URL: "https://sonarqube.company.com" SNYK_TOKEN: "$SNYK_TOKEN" CHECKMARX_URL: "https://checkmarx.company.com"

Pre-commit security checks

security-pre-commit: stage: security-scan image: alpine:latest before_script: - apk add --no-cache git python3 py3-pip - pip3 install pre-commit script: - pre-commit run --all-files - echo "Pre-commit security checks passed" artifacts: reports: junit: pre-commit-results.xml only: - merge_requests - master - develop

Static Application Security Testing (SAST)

sast-scan: stage: security-scan image: sonarsource/sonar-scanner-cli:latest variables: SONAR_USER_HOME: "${CI_PROJECT_DIR}/.sonar" GIT_DEPTH: "0" cache: key: "${CI_JOB_NAME}" paths: - .sonar/cache script: - sonar-scanner -Dsonar.projectKey=${CI_PROJECT_NAME} -Dsonar.sources=. -Dsonar.host.url=${SONARQUBE_URL} -Dsonar.login=${SONARQUBE_TOKEN} -Dsonar.qualitygate.wait=true artifacts: reports: junit: sonar-report.xml allow_failure: false

Software Composition Analysis (SCA)

dependency-scan: stage: security-scan image: snyk/snyk:node script: - snyk auth $SNYK_TOKEN - snyk test --json > snyk-results.json || true - snyk monitor --project-name=${CI_PROJECT_NAME} # Convert Snyk results to GitLab format - snyk-to-html -i snyk-results.json -o snyk-report.html artifacts: reports: dependency_scanning: snyk-results.json paths: - snyk-report.html expire_in: 1 week

Secret detection

secret-detection: stage: security-scan image: alpine:git before_script: - apk add --no-cache git - git clone https://github.com/trufflesecurity/trufflehog.git - cd trufflehog && go build script: - ./trufflehog/trufflehog --regex --entropy=False ${CI_PROJECT_DIR} - echo "No secrets detected in repository" allow_failure: false

Build with security hardening

build-secure: stage: build image: docker:20.10.16 services: - docker:20.10.16-dind before_script: - echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY script: # Multi-stage build with security scanning - docker build --target production --build-arg BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ') --build-arg VCS_REF=$CI_COMMIT_SHA --build-arg VERSION=$CI_COMMIT_TAG -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA . # Container security scanning with Trivy - docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v $PWD:/tmp aquasec/trivy:latest image --format json --output /tmp/trivy-report.json $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA # Fail build if HIGH or CRITICAL vulnerabilities found - docker run --rm -v $PWD:/tmp aquasec/trivy:latest image --exit-code 1 --severity HIGH,CRITICAL $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA artifacts: reports: container_scanning: trivy-report.json expire_in: 1 week

Dynamic Application Security Testing (DAST)

dast-scan: stage: security-test image: owasp/zap2docker-stable:latest services: - name: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA alias: testapp script: - mkdir -p /zap/wrk # Wait for application to be ready - timeout 300 sh -c 'until wget -q --spider http://testapp:8080/health; do sleep 5; done' # Run ZAP baseline scan - /zap/zap-baseline.py -t http://testapp:8080 -g gen.conf -J zap-report.json -r zap-report.html # Run full scan for critical applications - if [ "$SECURITY_LEVEL" = "critical" ]; then /zap/zap-full-scan.py -t http://testapp:8080 -J zap-full-report.json; fi artifacts: reports: dast: zap-report.json paths: - zap-report.html expire_in: 1 week allow_failure: false

Infrastructure as Code security scanning

iac-security-scan: stage: security-test image: bridgecrew/checkov:latest script: # Scan Terraform files - checkov -d . --framework terraform --output json --output-file checkov-terraform.json # Scan Kubernetes manifests - checkov -d ./k8s --framework kubernetes --output json --output-file checkov-k8s.json # Scan Dockerfiles - checkov -f Dockerfile --framework dockerfile --output json --output-file checkov-docker.json # Combine reports - cat checkov-*.json > combined-iac-report.json artifacts: reports: sast: combined-iac-report.json expire_in: 1 week allow_failure: false

Secure Container Build Process

Security starts with the container image. Here's our production-hardened Dockerfile:

Multi-stage build with security hardening

ARG GOLANG_VERSION=1.21-alpine3.19 ARG ALPINE_VERSION=3.19

Build stage with specific user

FROM golang:${GOLANG_VERSION} AS builder

Security: Create non-root user for build

RUN adduser -D -s /bin/sh -u 1001 builduser

Security: Install only necessary packages and clean up

RUN apk add --no-cache ca-certificates git tzdata && \ apk upgrade --no-cache

WORKDIR /build COPY --chown=builduser:builduser go.mod go.sum ./

Security: Run as non-root user

USER builduser RUN go mod download && go mod verify

COPY --chown=builduser:builduser . .

Security: Build with security flags

RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ -ldflags='-w -s -extldflags "-static"' \ -a -installsuffix cgo \ -o app ./cmd/server

Production stage - minimal attack surface

FROM alpine:${ALPINE_VERSION}

Security: Add metadata for compliance

LABEL maintainer="devops@company.com" \ version="1.0" \ description="Production API server" \ security.scan.date="${BUILD_DATE}" \ org.opencontainers.image.source="https://github.com/company/api-server"

Security: Install security updates only

RUN apk update && \ apk upgrade && \ apk add --no-cache ca-certificates tzdata && \ rm -rf /var/cache/apk/*

Security: Create non-privileged user

RUN addgroup -g 1001 appgroup && \ adduser -D -s /sbin/nologin -u 1001 -G appgroup appuser

Security: Set up secure directories

RUN mkdir -p /app/logs /app/config && \ chown -R appuser:appgroup /app && \ chmod 750 /app

WORKDIR /app

Security: Copy files with proper ownership

COPY --from=builder --chown=appuser:appgroup /build/app ./ COPY --from=builder --chown=appuser:appgroup /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

Security: Drop privileges

USER appuser

Security: Use specific port

EXPOSE 8080

Security: Use exec form and specific user

ENTRYPOINT ["./app"]

Health check for security monitoring

HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1

Kubernetes Security Configuration

Secure deployment manifests with comprehensive security policies:

deployment-secure.yaml

apiVersion: apps/v1 kind: Deployment metadata: name: api-server namespace: production labels: app: api-server version: v1.2.3 security.policy: strict spec: replicas: 3 selector: matchLabels: app: api-server template: metadata: labels: app: api-server version: v1.2.3 annotations: # Security annotations for monitoring security.alpha.kubernetes.io/sysctls: net.core.somaxconn=1024 container.apparmor.security.beta.kubernetes.io/api-server: runtime/default spec: # Security: Service account with minimal permissions serviceAccountName: api-server-sa automountServiceAccountToken: false # Security: Pod security context securityContext: runAsNonRoot: true runAsUser: 1001 runAsGroup: 1001 fsGroup: 1001 seccompProfile: type: RuntimeDefault supplementalGroups: [1001] containers: - name: api-server image: registry.company.com/api-server:v1.2.3 imagePullPolicy: Always ports: - name: http containerPort: 8080 protocol: TCP # Security: Container security context securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true runAsNonRoot: true runAsUser: 1001 runAsGroup: 1001 capabilities: drop: - ALL add: [] seccompProfile: type: RuntimeDefault # Resource limits for security resources: limits: cpu: 500m memory: 512Mi ephemeral-storage: 1Gi requests: cpu: 200m memory: 256Mi ephemeral-storage: 100Mi # Health checks livenessProbe: httpGet: path: /health port: http scheme: HTTP initialDelaySeconds: 30 periodSeconds: 10 timeoutSeconds: 5 failureThreshold: 3 readinessProbe: httpGet: path: /health/ready port: http scheme: HTTP initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 5 failureThreshold: 3 # Environment variables from secrets env: - name: DATABASE_URL valueFrom: secretKeyRef: name: api-server-secrets key: database-url - name: API_KEY valueFrom: secretKeyRef: name: api-server-secrets key: api-key # Secure volume mounts volumeMounts: - name: tmp-volume mountPath: /tmp - name: logs-volume mountPath: /app/logs - name: config-volume mountPath: /app/config readOnly: true # Security: Volumes configuration volumes: - name: tmp-volume emptyDir: sizeLimit: 100Mi - name: logs-volume emptyDir: sizeLimit: 500Mi - name: config-volume configMap: name: api-server-config defaultMode: 0644 # Security: Node selection and affinity nodeSelector: security-zone: restricted # Security: Pod anti-affinity for high availability affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - api-server topologyKey: kubernetes.io/hostname

---

Network policy for security isolation

apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-server-netpol namespace: production spec: podSelector: matchLabels: app: api-server policyTypes: - Ingress - Egress ingress: - from: - namespaceSelector: matchLabels: name: ingress-nginx - podSelector: matchLabels: app: nginx-ingress ports: - protocol: TCP port: 8080 egress: - to: - namespaceSelector: matchLabels: name: database ports: - protocol: TCP port: 5432 - to: [] ports: - protocol: TCP port: 443 # HTTPS outbound - protocol: TCP port: 53 # DNS - protocol: UDP port: 53 # DNS

---

Pod Security Policy (or Pod Security Standards)

apiVersion: policy/v1beta1 kind: PodSecurityPolicy metadata: name: api-server-psp spec: privileged: false allowPrivilegeEscalation: false requiredDropCapabilities: - ALL volumes: - 'configMap' - 'emptyDir' - 'projected' - 'secret' - 'downwardAPI' - 'persistentVolumeClaim' runAsUser: rule: 'MustRunAsNonRoot' runAsGroup: rule: 'MustRunAs' ranges: - min: 1001 max: 1001 seLinux: rule: 'RunAsAny' fsGroup: rule: 'MustRunAs' ranges: - min: 1001 max: 1001 readOnlyRootFilesystem: true

Automated Security Testing

Comprehensive automated security testing integrated into the pipeline:

#!/usr/bin/env python3

security-test-suite.py

import requests import json import subprocess import time import sys from typing import Dict, List, Tuple

class SecurityTestSuite: def __init__(self, base_url: str, api_key: str): self.base_url = base_url.rstrip('/') self.api_key = api_key self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'SecurityTestSuite/1.0', 'Accept': 'application/json' }) def run_all_tests(self) -> Dict: """Run comprehensive security test suite""" results = { 'timestamp': time.time(), 'target': self.base_url, 'tests': {} } test_suites = [ ('authentication', self.test_authentication_security), ('authorization', self.test_authorization_controls), ('input_validation', self.test_input_validation), ('ssl_tls', self.test_ssl_configuration), ('headers', self.test_security_headers), ('rate_limiting', self.test_rate_limiting), ('injection', self.test_injection_vulnerabilities), ('session_management', self.test_session_security) ] for test_name, test_func in test_suites: print(f"Running {test_name} tests...") try: results['tests'][test_name] = test_func() print(f"✓ {test_name} tests completed") except Exception as e: print(f"✗ {test_name} tests failed: {e}") results['tests'][test_name] = {'error': str(e), 'passed': False} return results def test_authentication_security(self) -> Dict: """Test authentication mechanisms""" tests = {} # Test 1: No credentials should return 401 response = self.session.get(f"{self.base_url}/api/protected") tests['no_auth_401'] = { 'passed': response.status_code == 401, 'expected': 401, 'actual': response.status_code } # Test 2: Invalid credentials should return 401 invalid_headers = {'Authorization': 'Bearer invalid_token'} response = self.session.get(f"{self.base_url}/api/protected", headers=invalid_headers) tests['invalid_auth_401'] = { 'passed': response.status_code == 401, 'expected': 401, 'actual': response.status_code } # Test 3: Valid credentials should work valid_headers = {'Authorization': f'Bearer {self.api_key}'} response = self.session.get(f"{self.base_url}/api/protected", headers=valid_headers) tests['valid_auth_200'] = { 'passed': response.status_code == 200, 'expected': 200, 'actual': response.status_code } # Test 4: JWT token structure (if using JWT) if self.api_key.count('.') == 2: # JWT format tests['jwt_structure'] = self.validate_jwt_structure(self.api_key) return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def test_authorization_controls(self) -> Dict: """Test authorization and access controls""" tests = {} # Test role-based access control endpoints_to_test = [ ('/api/admin/users', 403, 'admin_endpoint_forbidden'), ('/api/user/profile', 200, 'user_endpoint_allowed'), ('/api/public/health', 200, 'public_endpoint_allowed') ] headers = {'Authorization': f'Bearer {self.api_key}'} for endpoint, expected_status, test_name in endpoints_to_test: response = self.session.get(f"{self.base_url}{endpoint}", headers=headers) tests[test_name] = { 'passed': response.status_code == expected_status, 'expected': expected_status, 'actual': response.status_code, 'endpoint': endpoint } return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def test_input_validation(self) -> Dict: """Test input validation and sanitization""" tests = {} headers = {'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json'} # Test SQL injection attempts sql_payloads = [ "'; DROP TABLE users; --", "1' OR '1'='1", "admin'/*", "' UNION SELECT * FROM users --" ] for i, payload in enumerate(sql_payloads): test_data = {'username': payload, 'email': 'test@example.com'} response = self.session.post(f"{self.base_url}/api/users", json=test_data, headers=headers) # Should reject malicious input (400 or 422) tests[f'sql_injection_{i+1}'] = { 'passed': response.status_code in [400, 422], 'payload': payload, 'status_code': response.status_code } # Test XSS payloads xss_payloads = [ "", "javascript:alert('xss')", "", "';alert('xss');//" ] for i, payload in enumerate(xss_payloads): test_data = {'comment': payload} response = self.session.post(f"{self.base_url}/api/comments", json=test_data, headers=headers) tests[f'xss_injection_{i+1}'] = { 'passed': response.status_code in [400, 422], 'payload': payload, 'status_code': response.status_code } return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def test_ssl_configuration(self) -> Dict: """Test SSL/TLS configuration""" tests = {} try: # Test SSL certificate response = self.session.get(self.base_url, verify=True) tests['valid_ssl_cert'] = { 'passed': True, 'message': 'SSL certificate is valid' } except requests.exceptions.SSLError as e: tests['valid_ssl_cert'] = { 'passed': False, 'error': str(e) } # Test HTTP to HTTPS redirect if self.base_url.startswith('https://'): http_url = self.base_url.replace('https://', 'http://') try: response = self.session.get(http_url, allow_redirects=False) tests['https_redirect'] = { 'passed': response.status_code in [301, 302, 307, 308], 'status_code': response.status_code } except Exception as e: tests['https_redirect'] = { 'passed': False, 'error': str(e) } return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def test_security_headers(self) -> Dict: """Test security headers""" response = self.session.get(self.base_url) headers = response.headers required_headers = { 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': ['DENY', 'SAMEORIGIN'], 'X-XSS-Protection': '1; mode=block', 'Strict-Transport-Security': None, # Should exist 'Content-Security-Policy': None, # Should exist 'Referrer-Policy': None # Should exist } tests = {} for header, expected_value in required_headers.items(): header_present = header in headers tests[f'{header.lower().replace("-", "_")}_present'] = { 'passed': header_present, 'expected': f'{header} header present', 'actual': f'{header} header {"present" if header_present else "missing"}' } if header_present and expected_value: if isinstance(expected_value, list): tests[f'{header.lower().replace("-", "_")}_value'] = { 'passed': headers[header] in expected_value, 'expected': f'One of: {expected_value}', 'actual': headers[header] } else: tests[f'{header.lower().replace("-", "_")}_value'] = { 'passed': headers[header] == expected_value, 'expected': expected_value, 'actual': headers[header] } return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def test_rate_limiting(self) -> Dict: """Test rate limiting implementation""" tests = {} # Make rapid requests to test rate limiting rapid_requests = [] for i in range(20): # Make 20 rapid requests try: response = self.session.get(f"{self.base_url}/api/public/health") rapid_requests.append(response.status_code) time.sleep(0.1) # Small delay between requests except Exception as e: rapid_requests.append(f"Error: {e}") # Check if rate limiting kicked in (expect some 429 responses) rate_limited_responses = [code for code in rapid_requests if code == 429] tests['rate_limiting_active'] = { 'passed': len(rate_limited_responses) > 0, 'total_requests': len(rapid_requests), 'rate_limited_count': len(rate_limited_responses), 'message': 'Rate limiting should trigger after rapid requests' } return { 'passed': all(test.get('passed', False) for test in tests.values()), 'tests': tests } def validate_jwt_structure(self, token: str) -> Dict: """Validate JWT token structure and claims""" try: import base64 # Decode JWT header and payload (without verification for testing) header, payload, signature = token.split('.') # Add padding if needed header += '=' * (4 - len(header) % 4) payload += '=' * (4 - len(payload) % 4) header_data = json.loads(base64.b64decode(header)) payload_data = json.loads(base64.b64decode(payload)) # Check required JWT claims required_claims = ['iat', 'exp', 'sub'] missing_claims = [claim for claim in required_claims if claim not in payload_data] # Check token expiration current_time = time.time() token_expired = payload_data.get('exp', 0) < current_time return { 'passed': len(missing_claims) == 0 and not token_expired, 'header': header_data, 'payload_claims': list(payload_data.keys()), 'missing_claims': missing_claims, 'expired': token_expired } except Exception as e: return { 'passed': False, 'error': f"JWT validation failed: {e}" } def generate_report(self, results: Dict) -> str: """Generate security test report""" report = [] report.append("# Security Test Report") report.append(f"Target: {results['target']}") report.append(f"Timestamp: {time.ctime(results['timestamp'])}") report.append("") total_tests = 0 passed_tests = 0 for test_suite, suite_results in results['tests'].items(): report.append(f"## {test_suite.replace('_', ' ').title()}") suite_passed = suite_results.get('passed', False) status_icon = "✅" if suite_passed else "❌" report.append(f"{status_icon} Overall: {'PASSED' if suite_passed else 'FAILED'}") report.append("") if 'tests' in suite_results: for test_name, test_result in suite_results['tests'].items(): test_passed = test_result.get('passed', False) test_icon = "✅" if test_passed else "❌" report.append(f" {test_icon} {test_name}: {'PASS' if test_passed else 'FAIL'}") total_tests += 1 if test_passed: passed_tests += 1 report.append("") # Summary pass_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0 report.insert(2, f"Pass Rate: {pass_rate}% ({passed_tests}/{total_tests})") report.insert(3, "") return "\n".join(report)

if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: python security-test-suite.py ") sys.exit(1) base_url = sys.argv[1] api_key = sys.argv[2] suite = SecurityTestSuite(base_url, api_key) results = suite.run_all_tests() # Generate and save report report = suite.generate_report(results) with open('security-test-report.md', 'w') as f: f.write(report) # Save raw results with open('security-test-results.json', 'w') as f: json.dump(results, f, indent=2) # Exit with appropriate code all_passed = all(test.get('passed', False) for test in results['tests'].values()) print(f"\nSecurity tests {'PASSED' if all_passed else 'FAILED'}") print(f"Report saved to: security-test-report.md") sys.exit(0 if all_passed else 1)

Compliance and Monitoring

Continuous compliance monitoring with automated reporting:

#!/usr/bin/env python3

compliance-monitor.py

import json import boto3 import requests from datetime import datetime, timedelta from typing import Dict, List import logging

class ComplianceMonitor: def __init__(self, config: Dict): self.config = config self.aws_client = boto3.client('securityhub') self.logger = logging.getLogger(__name__) def check_security_compliance(self) -> Dict: """Check overall security compliance""" compliance_checks = { 'cis_benchmarks': self.check_cis_compliance(), 'owasp_asvs': self.check_owasp_compliance(), 'pci_dss': self.check_pci_compliance(), 'gdpr_privacy': self.check_gdpr_compliance(), 'vulnerability_management': self.check_vulnerability_status() } # Calculate overall compliance score total_checks = sum(len(checks['results']) for checks in compliance_checks.values()) passed_checks = sum( sum(1 for result in checks['results'] if result.get('compliant', False)) for checks in compliance_checks.values() ) overall_score = (passed_checks / total_checks * 100) if total_checks > 0 else 0 return { 'timestamp': datetime.utcnow().isoformat(), 'overall_compliance_score': overall_score, 'compliance_checks': compliance_checks, 'recommendations': self.generate_recommendations(compliance_checks) } def check_cis_compliance(self) -> Dict: """Check CIS (Center for Internet Security) benchmark compliance""" results = [] # CIS Control 1: Inventory and Control of Hardware Assets results.append({ 'control': 'CIS-1', 'description': 'Inventory and Control of Hardware Assets', 'compliant': self.verify_asset_inventory(), 'evidence': 'Asset inventory updated in last 24 hours' }) # CIS Control 2: Inventory and Control of Software Assets results.append({ 'control': 'CIS-2', 'description': 'Inventory and Control of Software Assets', 'compliant': self.verify_software_inventory(), 'evidence': 'Container images scanned and catalogued' }) # CIS Control 3: Continuous Vulnerability Management results.append({ 'control': 'CIS-3', 'description': 'Continuous Vulnerability Management', 'compliant': self.verify_vulnerability_scanning(), 'evidence': 'Daily vulnerability scans with <24h remediation SLA' }) # CIS Control 6: Maintenance, Monitoring, and Analysis of Audit Logs results.append({ 'control': 'CIS-6', 'description': 'Audit Log Management', 'compliant': self.verify_audit_logging(), 'evidence': 'Centralized logging with 90-day retention' }) return { 'framework': 'CIS Controls v8', 'results': results, 'compliance_rate': sum(1 for r in results if r['compliant']) / len(results) * 100 } def check_owasp_compliance(self) -> Dict: """Check OWASP ASVS (Application Security Verification Standard) compliance""" results = [] # V1: Architecture, Design and Threat Modeling results.append({ 'control': 'ASVS-V1', 'description': 'Architecture and Design', 'compliant': self.verify_secure_architecture(), 'evidence': 'Threat model documented and reviewed' }) # V2: Authentication results.append({ 'control': 'ASVS-V2', 'description': 'Authentication Verification', 'compliant': self.verify_authentication_controls(), 'evidence': 'Multi-factor authentication enforced' }) # V3: Session Management results.append({ 'control': 'ASVS-V3', 'description': 'Session Management', 'compliant': self.verify_session_management(), 'evidence': 'Secure session handling implemented' }) # V4: Access Control results.append({ 'control': 'ASVS-V4', 'description': 'Access Control Verification', 'compliant': self.verify_access_controls(), 'evidence': 'Role-based access control implemented' }) # V7: Error Handling and Logging results.append({ 'control': 'ASVS-V7', 'description': 'Error Handling and Logging', 'compliant': self.verify_error_handling(), 'evidence': 'Secure error handling without information disclosure' }) return { 'framework': 'OWASP ASVS v4.0', 'results': results, 'compliance_rate': sum(1 for r in results if r['compliant']) / len(results) * 100 } def verify_asset_inventory(self) -> bool: """Verify asset inventory is current""" try: # Check if asset inventory was updated recently last_update = self.get_last_inventory_update() return (datetime.utcnow() - last_update).days < 1 except Exception: return False def verify_vulnerability_scanning(self) -> bool: """Verify continuous vulnerability scanning""" try: # Check latest scan results from Security Hub response = self.aws_client.get_findings( Filters={ 'ProductName': [{'Value': 'Inspector', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'CreatedAt': [{ 'Start': (datetime.utcnow() - timedelta(days=1)).isoformat(), 'End': datetime.utcnow().isoformat() }] } ) return len(response.get('Findings', [])) > 0 except Exception: return False def generate_security_report(self, compliance_data: Dict) -> str: """Generate comprehensive security report""" report = [] report.append("# Security Compliance Report") report.append(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") report.append(f"Overall Compliance Score: {compliance_data['overall_compliance_score']}%") report.append("") # Executive Summary if compliance_data['overall_compliance_score'] >= 90: status = "🟢 EXCELLENT" summary = "Security posture is excellent with minimal risks identified." elif compliance_data['overall_compliance_score'] >= 75: status = "🟡 GOOD" summary = "Security posture is good with some areas for improvement." else: status = "🔴 NEEDS ATTENTION" summary = "Security posture needs immediate attention with multiple compliance gaps." report.append(f"## Executive Summary") report.append(f"Status: {status}") report.append(f"{summary}") report.append("") # Detailed Results for framework, results in compliance_data['compliance_checks'].items(): report.append(f"## {framework.replace('_', ' ').title()}") report.append(f"Compliance Rate: {results['compliance_rate']}%") report.append("") for result in results['results']: status_icon = "✅" if result['compliant'] else "❌" report.append(f"{status_icon} {result['control']}: {result['description']}") if result.get('evidence'): report.append(f" Evidence: {result['evidence']}") report.append("") # Recommendations if compliance_data.get('recommendations'): report.append("## Recommendations") for i, rec in enumerate(compliance_data['recommendations'], 1): report.append(f"{i}. {rec['priority']}: {rec['description']}") if rec.get('implementation'): report.append(f" Implementation: {rec['implementation']}") report.append("") return "\n".join(report)

Example usage in CI/CD pipeline

if __name__ == "__main__": config = { 'aws_region': 'us-east-1', 'notification_webhook': 'https://hooks.slack.com/services/...', 'compliance_threshold': 85.0 } monitor = ComplianceMonitor(config) compliance_data = monitor.check_security_compliance() # Generate report report = monitor.generate_security_report(compliance_data) # Save report with open('security-compliance-report.md', 'w') as f: f.write(report) # Send notification if compliance below threshold if compliance_data['overall_compliance_score'] < config['compliance_threshold']: # Send alert to security team print("⚠️ Security compliance below threshold - alert sent") print(f"Compliance Score: {compliance_data['overall_compliance_score']}%")

Security Monitoring and Alerting

Real-time security monitoring with automated response:

security-monitoring.yaml - Prometheus + Grafana monitoring

apiVersion: monitoring.coreos.com/v1 kind: PrometheusRule metadata: name: security-alerts namespace: monitoring spec: groups: - name: security-high-priority interval: 30s rules: # Authentication failures - alert: HighAuthenticationFailures expr: sum(rate(http_requests_total{status=~"401|403"}[5m])) > 10 for: 2m labels: severity: critical category: security annotations: summary: "High number of authentication failures detected" description: "More than 10 authentication failures per second in the last 5 minutes" runbook_url: "https://wiki.company.com/security/auth-failures" # Potential DDoS attack - alert: PotentialDDoSAttack expr: sum(rate(http_requests_total[1m])) > 1000 for: 1m labels: severity: critical category: security annotations: summary: "Potential DDoS attack detected" description: "Request rate exceeds 1000 requests per second" runbook_url: "https://wiki.company.com/security/ddos-response" # High error rate (potential attack or system compromise) - alert: HighErrorRate expr: sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.1 for: 5m labels: severity: warning category: security annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" # SQL injection attempts - alert: SQLInjectionAttempts expr: sum(rate(security_events_total{type="sql_injection"}[5m])) > 0 for: 0s labels: severity: critical category: security annotations: summary: "SQL injection attempts detected" description: "SQL injection patterns detected in application logs" runbook_url: "https://wiki.company.com/security/sql-injection-response" # Container security violations - alert: ContainerSecurityViolation expr: sum(rate(falco_events_total{rule_type="security"}[5m])) > 0 for: 0s labels: severity: high category: security annotations: summary: "Container security violation detected" description: "Falco detected security policy violation in containers" # Certificate expiration - alert: CertificateExpiringSoon expr: (ssl_certificate_expiry_seconds - time()) / 86400 < 30 for: 1h labels: severity: warning category: security annotations: summary: "SSL certificate expiring soon" description: "Certificate {{ $labels.instance }} expires in {{ $value }} days"

- name: security-compliance interval: 1h rules: # Vulnerability scan results - alert: HighSeverityVulnerabilities expr: sum(vulnerability_scanner{severity="HIGH"}) > 0 for: 0s labels: severity: high category: compliance annotations: summary: "High severity vulnerabilities found" description: "{{ $value }} high severity vulnerabilities detected" # Compliance check failures - alert: ComplianceCheckFailure expr: compliance_check_success < 0.9 for: 15m labels: severity: warning category: compliance annotations: summary: "Compliance checks failing" description: "Compliance success rate is {{ $value | humanizePercentage }}"

Secure Secrets Management

Comprehensive secrets management strategy:

#!/bin/bash

secrets-management.sh - Secure secrets handling

set -euo pipefail

Configuration

VAULT_ADDR="${VAULT_ADDR:-https://vault.company.com}" KUBERNETES_NAMESPACE="${KUBERNETES_NAMESPACE:-production}" APP_NAME="${APP_NAME:-api-server}"

Colors for output

RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' # No Color

log_info() { echo -e "${GREEN}[INFO]${NC} $1" }

log_warn() { echo -e "${YELLOW}[WARN]${NC} $1" }

log_error() { echo -e "${RED}[ERROR]${NC} $1" }

Rotate application secrets

rotate_secrets() { local secret_name="$1" local vault_path="$2" log_info "Rotating secrets for $secret_name" # Generate new secrets local new_database_password=$(openssl rand -base64 32) local new_api_key=$(openssl rand -hex 32) local new_jwt_secret=$(openssl rand -base64 64) # Store in Vault vault kv put "$vault_path" \ database_password="$new_database_password" \ api_key="$new_api_key" \ jwt_secret="$new_jwt_secret" # Update Kubernetes secret kubectl create secret generic "$secret_name" \ --namespace="$KUBERNETES_NAMESPACE" \ --from-literal=database-password="$new_database_password" \ --from-literal=api-key="$new_api_key" \ --from-literal=jwt-secret="$new_jwt_secret" \ --dry-run=client -o yaml | kubectl apply -f - # Rolling restart of deployments using this secret kubectl rollout restart deployment/$APP_NAME -n "$KUBERNETES_NAMESPACE" log_info "Secret rotation completed for $secret_name" }

Scan for secrets in code

scan_for_secrets() { log_info "Scanning for hardcoded secrets..." # Use multiple tools for comprehensive scanning local findings=0 # Trufflehog scan if command -v trufflehog &> /dev/null; then log_info "Running Trufflehog scan..." if trufflehog --regex --entropy=False . > trufflehog-results.json; then local trufflehog_findings=$(jq length trufflehog-results.json) if [ "$trufflehog_findings" -gt 0 ]; then log_error "Trufflehog found $trufflehog_findings potential secrets" findings=$((findings + trufflehog_findings)) fi fi fi # GitLeaks scan if command -v gitleaks &> /dev/null; then log_info "Running GitLeaks scan..." if ! gitleaks detect --source . --report-path gitleaks-report.json; then local gitleaks_findings=$(jq length gitleaks-report.json 2>/dev/null || echo "0") if [ "$gitleaks_findings" -gt 0 ]; then log_error "GitLeaks found $gitleaks_findings potential secrets" findings=$((findings + gitleaks_findings)) fi fi fi # Custom regex patterns log_info "Running custom secret patterns scan..." local secret_patterns=( "password\s=\s["'][^"']{8,}["']" "api[_-]?key\s=\s["'][^"']{16,}["']" "secret\s=\s["'][^"']{16,}["']" "token\s=\s["'][^"']{16,}["']" "-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----" ) for pattern in "${secret_patterns[@]}"; do if grep -r -E -i "$pattern" . --exclude-dir=.git --exclude="*.md" --exclude="secrets-management.sh"; then log_error "Found potential secret matching pattern: $pattern" findings=$((findings + 1)) fi done if [ $findings -eq 0 ]; then log_info "No secrets found in codebase ✓" return 0 else log_error "Found $findings potential secrets in codebase" return 1 fi }

Validate secret strength

validate_secret_strength() { local secret="$1" local min_length=${2:-16} # Check length if [ ${#secret} -lt $min_length ]; then log_error "Secret too short (minimum $min_length characters)" return 1 fi # Check complexity if ! echo "$secret" | grep -q '[A-Z]'; then log_warn "Secret should contain uppercase letters" fi if ! echo "$secret" | grep -q '[a-z]'; then log_warn "Secret should contain lowercase letters" fi if ! echo "$secret" | grep -q '[0-9]'; then log_warn "Secret should contain numbers" fi if ! echo "$secret" | grep -q '[^A-Za-z0-9]'; then log_warn "Secret should contain special characters" fi log_info "Secret validation completed" return 0 }

Setup External Secrets Operator

setup_external_secrets() { log_info "Setting up External Secrets Operator..." # Install External Secrets Operator helm repo add external-secrets https://charts.external-secrets.io helm repo update helm upgrade --install external-secrets external-secrets/external-secrets \ --namespace external-secrets-system \ --create-namespace \ --set installCRDs=true # Create SecretStore for Vault integration cat <

Main function

main() { case "${1:-}" in "rotate") rotate_secrets "${2:-$APP_NAME-secrets}" "${3:-secret/$APP_NAME}" ;; "scan") scan_for_secrets ;; "validate") validate_secret_strength "${2:-}" "${3:-16}" ;; "setup-external-secrets") setup_external_secrets ;; *) echo "Usage: $0 {rotate|scan|validate|setup-external-secrets} [args...]" echo "" echo "Commands:" echo " rotate - Rotate application secrets" echo " scan - Scan for hardcoded secrets" echo " validate [min-length] - Validate secret strength" echo " setup-external-secrets - Setup External Secrets Operator" exit 1 ;; esac }

main "$@"

Conclusion

DevSecOps is not just about adding security tools to your pipeline – it's about creating a security-first culture where security is everyone's responsibility. The implementations shown in this guide provide:

- Automated Security Testing that catches issues before production - Secure-by-Default Infrastructure that reduces attack surface - Continuous Compliance Monitoring that ensures ongoing security posture - Incident Response Capabilities that enable rapid threat response

Key principles for successful DevSecOps implementation:

1. Start Early: Integrate security from the first line of code 2. Automate Everything: Manual security processes don't scale 3. Fail Fast: Catch and fix security issues as early as possible 4. Monitor Continuously: Security is an ongoing process, not a one-time check 5. Educate Teams: Every developer should understand security implications

The security landscape is constantly evolving, but with these foundational practices in place, your organization will be well-positioned to detect, respond to, and prevent security threats while maintaining development velocity.

---

Need help implementing DevSecOps in your organization? Contact our security experts for guidance on secure CI/CD pipeline design, compliance automation, and security culture transformation.

Tags:

#devsecops#security#cicd#kubernetes#compliance#automation#sast#dast

Need Expert Help with Your Implementation?

Our senior consultants have years of experience solving complex technical challenges. Let us help you implement these solutions in your environment.