DEV Community

Cover image for Simple validation webhook with Python
Ashok Nagaraj
Ashok Nagaraj

Posted on

Simple validation webhook with Python

We will implement a simple validation webhook to achieve the following:

  • Input conditions:
  • Applicable to all namespaces where label:ngaddons/validation-webhooks is enabled
  • Applicable to object-types: Deployments and Pods
  • Not applicable if label:ngaddons/bypass is set on the object

  • Validation logic

  • Fail the object creation unless all of these labels are set: 'ngaddons/ownerId', 'ngaddons/webexRoomId', 'ngaddons/appName'

  • Bypass the checks if ngaddons/bypass is set


Webhook code
import logging
import os
from flask import Flask, jsonify, request

app = Flask('webhook')
app.logger.addHandler(logging.StreamHandler())
app.logger.setLevel(logging.DEBUG)

#Health check
@app.route("/healthz", methods=['GET'])
def ping():
  return jsonify({'message': 'ok'})

REQUIRED_LABELS = ['ngaddons/ownerId', 'ngaddons/webexRoomId', 'ngaddons/appName']

@app.route('/validate', methods=['POST'])
def deployment_webhook():
  r = request.get_json()

  req = r.get('request', {})
  try:
    if not req:
      return send_response(False, '<no uid>', "Invalid request, no payload.request found")

    uid = req.get("uid", '')
    app.logger.debug(f"+ uid: {uid}")
    if not uid:
      return send_response(False, '<no uid>', "Invalid request, no payload.request.uid found")

    labels = req.get("object", {}).get("metadata", {}).get("labels")
  if 'ngaddons/bypass' in labels:
      return send_response(True, uid, "Request bypassed as 'ngaddons/bypass' is set")

    missing = [ l for l in REQUIRED_LABELS if l not in labels ]
    app.logger.debug(f"+ missing: {missing}")
    if missing:
      return send_response(False, uid, f"Missing labels: {missing}")

  except Exception as e:
    return send_response(False, uid, f"Webhook exception: {e}")

  #Send OK
  return send_response(True, uid, "Request has required labels")


#Function to respond back to the Admission Controller
def send_response(allowed, uid, message):
  return jsonify({
      "apiVersion": "admission.k8s.io/v1",
      "kind": "AdmissionReview",
      "response": {
        "allowed": allowed,
        "uid": uid,
        "status": {"message": message}
    }
  })


if __name__ == "__main__":
  ca_crt = '/etc/ssl/ca.crt'
  ca_key = '/etc/ssl/ca.key'
  app.run(ssl_context=(ca_crt, ca_key), port=5000, host='0.0.0.0', debug=True)
Enter fullscreen mode Exit fullscreen mode
Dockerfile
# Image: ashoka007/check-labels:0.1
FROM python:3.8-slim
WORKDIR /app
COPY requirements.txt /app
RUN pip install -r requirements.txt
COPY app.py /app
CMD python app.py
Enter fullscreen mode Exit fullscreen mode
Webhook manifest
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: ngaddons-check-labels
  namespace: check-labels
webhooks:
  - name: ngaddons.check-labels.webhook
    failurePolicy: Fail
    sideEffects: None
    admissionReviewVersions: ["v1","v1beta1"]
    namespaceSelector:
      matchLabels:
        ngaddons/validation-webhooks: enabled
    rules:
      - apiGroups: ["apps", ""]
        resources:
          - "deployments"
          - "pods"
        apiVersions:
          - "*"
        operations:
          - CREATE
    clientConfig:
      service:
        name: ${WEBHOOK_SERVICE_NAME} # to be substituted
        namespace: ${WEBHOOK_NAMESPACE} # to be substituted
        path: /validate/
      caBundle: ${CA_BUNDLE} # to be substituted
Enter fullscreen mode Exit fullscreen mode
Create self-signed key-pair and ${CA_BUNDLE}
# Configuration parameters are the key and DNS match is required
[ req ]
default_bits       = 2048
distinguished_name = req_distinguished_name
req_extensions     = req_ext
prompt             = no
[ req_distinguished_name ]
countryName                 = IN
stateOrProvinceName         = KAR
localityName                = BGL
organizationName            = ACME INC
commonName                  = check-labels 0.1
[ req_ext ]
subjectAltName = @alt_names
[alt_names]
DNS.1   = ${WEBHOOK_SERVICE_NAME}.${WEBHOOK_NAMESPACE}.svc

❯ openssl req -x509 -newkey rsa:4096 -nodes -out certs/ca.crt -keyout certs/ca.key -days 365 -config conf/ext.cnf -extensions req_ext
Enter fullscreen mode Exit fullscreen mode
Create a secret with the above
❯ kubectl create secret tls webhook-secret --cert=certs/ca.crt --key=certs/ca.key --namespace=${WEBHOOK_NAMESPACE}
Enter fullscreen mode Exit fullscreen mode
Create namespace
❯ kubectl create namespace ${WEBHOOK_NAMESPACE}
Enter fullscreen mode Exit fullscreen mode
Kubernetes manifest for webhook deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: webhook-dep
  namespace: ${WEBHOOK_NAMESPACE}
  labels:
    app: webhook
spec:
  replicas: 2
  selector:
    matchLabels:
      app: webhook
  template:
    metadata:
      labels:
        app: webhook
    spec:
      containers:
        - name: webhook-container
          image: ashoka007/check-labels:0.1
          volumeMounts:
            - mountPath: /etc/ssl
              name: webhook-certs
              readOnly: true
      volumes:
      - name: webhook-certs
        secret:
          secretName: webhook-secret
Enter fullscreen mode Exit fullscreen mode
Expose the deployment with service
❯ kubectl expose deployment/webhook-dep --name=${WEBHOOK_SERVICE_NAME} --namespace=${WEBHOOK_NAMESPACE} --port=443 --target-port=5000
Enter fullscreen mode Exit fullscreen mode

Test
❯ kubectl create namespace demo-ns

# normal scenario
❯ kubectl run testpod --image=nginx -n demo-ns
pod/testpod created

# start enforcing validation
❯ kubectl label namespace demo-ns -l ngaddons/validation-webhooks=enabled

# validation fail
❯ kubectl run testpod2 --image=nginx -n demo-ns
Error from server: admission webhook "ngaddons.check-labels.webhook" denied the request: Missing labels: ['ngaddons/ownerCec', 'ngaddons/webexRoomId', 'ngaddons/appName']

# validation pass
❯ kubectl run testpod3 --image=nginx -n demo-ns -l=ngaddons/ownerId=ram -l=ngaddons/webexRoomId=rams-room-id -l=ngaddons/appName=rams-test-app
pod/testpod3 created

# validation bypass
❯ kubectl run testpod4 --image=nginx -n demo-ns -l=ngaddons/bypassed=1
pod/testpod4 created

Enter fullscreen mode Exit fullscreen mode
Delete validation webhook
❯ kubectl delete validatingwebhookconfigurations.admissionregistration.k8s.io check-labels
Enter fullscreen mode Exit fullscreen mode

Source code

Top comments (1)

Collapse
 
michael_odell_af5d95ecf70 profile image
Michael Odell

Thanks! This helped me today.