Sentiment Analysis Model on KServe

25 May 2025 · netologist · 18 min, 3751 words ·

Table of Contents

  1. Introduction and Core Concepts
  2. Architecture Deep Dive
  3. Environment Setup
  4. Your First Model Deployment
  5. Advanced Features
  6. Production Considerations
  7. Troubleshooting Guide
  8. Best Practices

1. Introduction and Core Concepts

What is KServe?

KServe (formerly KNative Serving) is a serverless machine learning inference platform built on Kubernetes. Think of it as the “Netflix for ML models” - it automatically handles scaling, routing, monitoring, and serving your models with zero manual intervention.

Why KServe Matters: The Problem It Solves

Before KServe:

Developer deploys model → Manual scaling → Manual monitoring → 
Manual A/B testing → Manual canary rollouts → Manual rollbacks

With KServe:

Developer deploys model → Everything else happens automatically

Core Concepts You Must Understand

1. InferenceService

The primary resource in KServe. It’s like a “smart container” that wraps your model with superpowers:

# Think of this as: "I want to serve my model with autoscaling"
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "my-awesome-model"
spec:
  predictor:  # The actual model serving logic
    containers:
    - image: my-model:latest

2. Predictor vs Transformer vs Explainer

Input → [Transformer] → Predictor → [Explainer] → Output

3. Serverless Magic: Scale-to-Zero

KServe automatically:

2. Architecture Deep Dive

The KServe Stack

┌─────────────────────────────────────────┐
│           Traffic (HTTP/gRPC)           │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│        Istio Gateway (Ingress)          │ ← Traffic routing
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│         KNative Serving                 │ ← Serverless runtime
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│         Your Model Container            │ ← Your code here
└─────────────────────────────────────────┘

Request Flow Explained

  1. Client sends HTTP request to model endpoint
  2. Istio Gateway receives and routes traffic
  3. KNative checks if pods are running:
    • If cold: Spins up new pods (1-3 seconds)
    • If warm: Routes to existing pods
  4. Your container processes the request
  5. Response flows back through the stack

Key Components Deep Dive

KNative Serving

Istio Service Mesh

3. Environment Setup

Prerequisites Checklist

Quick Setup with Kind (Local Development)

# 1. Create Kind cluster with specific configuration
cat <<EOF > kind-config.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
  kubeadmConfigPatches:
  - |
    kind: InitConfiguration
    nodeRegistration:
      kubeletExtraArgs:
        node-labels: "ingress-ready=true"
  extraPortMappings:
  - containerPort: 80
    hostPort: 8080
  - containerPort: 443
    hostPort: 8443
EOF

kind create cluster --config=kind-config.yaml --name=kserve-demo

Install KServe (The Right Way)

# 1. Install KNative Serving
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-core.yaml

# 2. Install Istio for KNative
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.12.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.12.0/net-istio.yaml

# 3. Install KServe
kubectl apply -f https://github.com/kserve/kserve/releases/download/v0.11.0/kserve.yaml
kubectl apply -f https://github.com/kserve/kserve/releases/download/v0.11.0/kserve-runtimes.yaml

# 4. Verify installation
kubectl get pods -n knative-serving
kubectl get pods -n kserve

Verification Script

#!/bin/bash
# verify-installation.sh

echo "🔍 Checking KServe installation..."

# Check KNative Serving
echo "📦 KNative Serving pods:"
kubectl get pods -n knative-serving --no-headers | while read line; do
  status=$(echo $line | awk '{print $3}')
  if [[ "$status" != "Running" ]]; then
    echo "❌ $line"
    exit 1
  else
    echo "✅ $line"
  fi
done

# Check KServe
echo "📦 KServe pods:"
kubectl get pods -n kserve --no-headers | while read line; do
  status=$(echo $line | awk '{print $3}')
  if [[ "$status" != "Running" ]]; then
    echo "❌ $line"
    exit 1
  else
    echo "✅ $line"
  fi
done

echo "🎉 Installation verified successfully!"

4. Your First Model Deployment

Step 1: Create a Simple Model

Let’s build a sentiment analysis model that everyone can understand:

# sentiment_model.py
import joblib
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
import json

# Create and train a simple sentiment model
def create_model():
    # Sample training data
    texts = [
        "I love this product", "This is amazing", "Great quality",
        "I hate this", "Terrible experience", "Very disappointed",
        "It's okay", "Not bad", "Average product"
    ]
    
    labels = [1, 1, 1, 0, 0, 0, 0.5, 0.5, 0.5]  # 1=positive, 0=negative, 0.5=neutral
    
    # Create pipeline
    pipeline = Pipeline([
        ('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
        ('classifier', LogisticRegression())
    ])
    
    # Train model
    pipeline.fit(texts, labels)
    
    return pipeline

# Create and save model
model = create_model()
joblib.dump(model, 'sentiment_model.joblib')
print("✅ Model created and saved!")

# Test the model
test_texts = ["I love KServe!", "This tutorial is confusing"]
predictions = model.predict(test_texts)
probabilities = model.predict_proba(test_texts)

for text, pred, prob in zip(test_texts, predictions, probabilities):
    sentiment = "positive" if pred > 0.6 else "negative" if pred < 0.4 else "neutral"
    confidence = max(prob)
    print(f"Text: '{text}' → {sentiment} (confidence: {confidence:.2f})")

Step 2: Create KServe-Compatible Predictor

# predictor.py
import joblib
import json
import logging
from typing import Dict, List, Any
from kserve import Model, ModelServer
import numpy as np

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SentimentPredictor(Model):
    """
    A KServe-compatible sentiment analysis predictor.
    
    This class handles:
    1. Model loading from storage
    2. Input validation and preprocessing  
    3. Making predictions
    4. Output formatting
    """
    
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.model = None
        self.ready = False
        
        # Define sentiment labels
        self.sentiment_labels = {
            0: "negative",
            0.5: "neutral", 
            1: "positive"
        }

    def load(self) -> bool:
        """
        Load the model from storage.
        
        KServe calls this method once when the container starts.
        This is where you load your model weights, artifacts, etc.
        """
        try:
            logger.info(f"Loading model for {self.name}")
            
            # Load model from the mounted storage path
            # In production, this could be S3, GCS, etc.
            model_path = "/mnt/models/sentiment_model.joblib"
            self.model = joblib.load(model_path)
            
            # Test the model with a simple prediction
            test_prediction = self.model.predict(["test"])
            logger.info(f"Model loaded successfully. Test prediction: {test_prediction}")
            
            self.ready = True
            return True
            
        except Exception as e:
            logger.error(f"Failed to load model: {str(e)}")
            raise e

    def predict(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
        """
        Make predictions on input data.
        
        Args:
            payload: Input data in the format:
                    {
                        "instances": [
                            {"text": "I love this product"},
                            {"text": "This is terrible"}
                        ]
                    }
            headers: HTTP headers (optional)
            
        Returns:
            Dictionary containing predictions:
            {
                "predictions": [
                    {
                        "text": "I love this product",
                        "sentiment": "positive", 
                        "confidence": 0.89,
                        "scores": {"positive": 0.89, "negative": 0.05, "neutral": 0.06}
                    }
                ]
            }
        """
        
        # Validate model is ready
        if not self.ready:
            raise RuntimeError("Model is not ready. Please check logs.")
        
        # Validate input format
        if "instances" not in payload:
            raise ValueError("Input must contain 'instances' field")
        
        instances = payload["instances"]
        if not isinstance(instances, list):
            raise ValueError("'instances' must be a list")
        
        logger.info(f"Processing {len(instances)} instances")
        
        try:
            # Extract text from instances
            texts = []
            for i, instance in enumerate(instances):
                if isinstance(instance, dict):
                    if "text" not in instance:
                        raise ValueError(f"Instance {i} missing 'text' field")
                    texts.append(instance["text"])
                elif isinstance(instance, str):
                    texts.append(instance)
                else:
                    raise ValueError(f"Instance {i} must be string or dict with 'text' field")
            
            # Make predictions
            predictions = self.model.predict(texts)
            probabilities = self.model.predict_proba(texts)
            
            # Format results
            results = []
            for text, pred, probs in zip(texts, predictions, probabilities):
                
                # Determine sentiment based on prediction
                if pred >= 0.7:
                    sentiment = "positive"
                elif pred <= 0.3:
                    sentiment = "negative"
                else:
                    sentiment = "neutral"
                
                # Get confidence (highest probability)
                confidence = float(np.max(probs))
                
                # Create probability scores for all classes
                # Note: This assumes binary classification, adjust for your model
                scores = {
                    "negative": float(probs[0]) if len(probs) > 0 else 0.0,
                    "positive": float(probs[1]) if len(probs) > 1 else 0.0,
                    "neutral": 1.0 - (float(probs[0]) + float(probs[1])) if len(probs) > 1 else 0.0
                }
                
                result = {
                    "text": text,
                    "sentiment": sentiment,
                    "confidence": confidence,
                    "prediction_score": float(pred),
                    "scores": scores
                }
                results.append(result)
            
            logger.info(f"Successfully processed {len(results)} predictions")
            return {"predictions": results}
            
        except Exception as e:
            logger.error(f"Prediction failed: {str(e)}")
            raise e

    def preprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
        """
        Optional: Preprocess input before prediction.
        Override this method if you need custom preprocessing.
        """
        return payload

    def postprocess(self, result: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
        """
        Optional: Postprocess prediction results.
        Override this method if you need custom postprocessing.
        """
        return result

if __name__ == "__main__":
    # Start the model server
    logger.info("Starting KServe ModelServer...")
    
    model = SentimentPredictor("sentiment-model")
    ModelServer().start([model])

Step 3: Create Dockerfile

# Dockerfile
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY predictor.py .
COPY sentiment_model.py .

# Create model directory
RUN mkdir -p /mnt/models

# Train and save model during build (for demo purposes)
# In production, you'd load from external storage
RUN python sentiment_model.py && \
    cp sentiment_model.joblib /mnt/models/

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8080/v1/models/sentiment-model || exit 1

# Start the server
CMD ["python", "predictor.py"]

Step 4: Requirements File

# requirements.txt
kserve>=0.11.0
scikit-learn>=1.3.0
joblib>=1.3.0
numpy>=1.24.0
pandas>=2.0.0

Step 5: Build and Load Image

# Build the Docker image
docker build -t sentiment-predictor:v1 .

# Load image into Kind cluster (for local development)
kind load docker-image sentiment-predictor:v1 --name kserve-demo

# Verify image is loaded
docker exec -it kserve-demo-control-plane crictl images | grep sentiment

Step 6: Deploy to KServe

# sentiment-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sentiment-predictor"
  namespace: "default"
  annotations:
    # Disable Istio sidecar injection for simplicity
    sidecar.istio.io/inject: "false"
spec:
  predictor:
    # Minimum and maximum number of replicas
    minReplicas: 0  # Enable scale-to-zero
    maxReplicas: 5  # Maximum auto-scale limit
    
    containers:
    - name: kserve-container
      image: sentiment-predictor:v1
      imagePullPolicy: IfNotPresent
      
      ports:
      - containerPort: 8080
        protocol: TCP
      
      # Environment variables
      env:
      - name: MODEL_NAME
        value: "sentiment-model"
      - name: STORAGE_URI
        value: "file:///mnt/models"
      
      # Resource requests and limits
      resources:
        requests:
          cpu: "100m"      # 0.1 CPU cores
          memory: "256Mi"   # 256 MB RAM
        limits:
          cpu: "500m"      # 0.5 CPU cores  
          memory: "512Mi"   # 512 MB RAM
      
      # Liveness and readiness probes
      livenessProbe:
        httpGet:
          path: /v1/models/sentiment-model
          port: 8080
        initialDelaySeconds: 30
        periodSeconds: 10
        timeoutSeconds: 5
        failureThreshold: 3
      
      readinessProbe:
        httpGet:
          path: /v1/models/sentiment-model
          port: 8080
        initialDelaySeconds: 10
        periodSeconds: 5
        timeoutSeconds: 5
        failureThreshold: 3

Step 7: Deploy and Test

# Deploy the InferenceService
kubectl apply -f sentiment-inferenceservice.yaml

# Wait for deployment to be ready
kubectl wait --for=condition=Ready inferenceservice/sentiment-predictor --timeout=300s

# Check the status
kubectl get inferenceservice sentiment-predictor
kubectl describe inferenceservice sentiment-predictor

# Get the service URL
export SERVICE_URL=$(kubectl get inferenceservice sentiment-predictor -o jsonpath='{.status.url}')
echo "Service URL: $SERVICE_URL"

Step 8: Comprehensive Testing

# Test 1: Simple prediction
curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
  -H "Content-Type: application/json" \
  -d '{
    "instances": [
      {"text": "I love KServe! It makes ML deployment so easy."},
      {"text": "This tutorial is confusing and poorly written."},
      {"text": "The weather is okay today."}
    ]
  }'

# Test 2: Edge cases
curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
  -H "Content-Type: application/json" \
  -d '{
    "instances": [
      {"text": ""},
      {"text": "😊😊😊"},
      {"text": "This is a very long text that contains multiple sentences. Some are positive, some are negative, and some are neutral. How will the model handle this complex input?"}
    ]
  }'

# Test 3: Load testing (optional)
for i in {1..10}; do
  curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
    -H "Content-Type: application/json" \
    -d '{"instances": [{"text": "Load test message '$i'"}]}' &
done
wait

Python Test Client

# test_client.py
import requests
import json
import time
from typing import List, Dict

class SentimentClient:
    def __init__(self, service_url: str):
        self.service_url = service_url.rstrip('/')
        self.predict_url = f"{self.service_url}/v1/models/sentiment-model:predict"
    
    def predict(self, texts: List[str]) -> Dict:
        """Send prediction request to KServe endpoint"""
        payload = {
            "instances": [{"text": text} for text in texts]
        }
        
        try:
            response = requests.post(
                self.predict_url,
                json=payload,
                headers={"Content-Type": "application/json"},
                timeout=30
            )
            response.raise_for_status()
            return response.json()
        
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return None
    
    def test_sentiment_analysis(self):
        """Run comprehensive tests"""
        test_cases = [
            # Positive examples
            "I absolutely love this product!",
            "Amazing quality and fast delivery.",
            "Best purchase I've made this year.",
            
            # Negative examples  
            "Terrible experience, waste of money.",
            "Poor quality, broke after one day.",
            "Customer service was horrible.",
            
            # Neutral examples
            "The product is okay.",
            "It works as expected.",
            "Average quality for the price.",
            
            # Edge cases
            "",
            "😊😊😊 👍",
            "This is a very long review with mixed sentiments. The product quality is excellent and I'm very satisfied with the purchase. However, the shipping was delayed and the packaging was damaged. Overall, it's a decent product but there's room for improvement in logistics."
        ]
        
        print("🧪 Testing Sentiment Analysis Model")
        print("=" * 50)
        
        start_time = time.time()
        result = self.predict(test_cases)
        end_time = time.time()
        
        if result:
            print(f"✅ Request completed in {end_time - start_time:.2f} seconds")
            print(f"📊 Processed {len(test_cases)} texts")
            print("\n📋 Results:")
            
            for i, prediction in enumerate(result["predictions"]):
                text = prediction["text"]
                sentiment = prediction["sentiment"]
                confidence = prediction["confidence"]
                
                # Truncate long texts for display
                display_text = text[:50] + "..." if len(text) > 50 else text
                
                print(f"{i+1:2d}. '{display_text}'")
                print(f"    → {sentiment} (confidence: {confidence:.2f})")
                print()
        else:
            print("❌ Test failed")

if __name__ == "__main__":
    import os
    
    # Get service URL from environment or use default
    service_url = os.getenv('SERVICE_URL', 'http://localhost:8080')
    
    client = SentimentClient(service_url)
    client.test_sentiment_analysis()

5. Advanced Features

Auto-scaling Configuration

KServe automatically scales your model based on traffic, but you can fine-tune the behavior:

# advanced-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sentiment-predictor-advanced"
  annotations:
    # Auto-scaling configuration
    autoscaling.knative.dev/target: "10"           # Target concurrent requests per pod
    autoscaling.knative.dev/maxScale: "10"         # Maximum number of pods
    autoscaling.knative.dev/minScale: "1"          # Minimum number of pods (disable scale-to-zero)
    autoscaling.knative.dev/scaleToZeroGracePeriod: "30s"  # Grace period before scaling to zero
    autoscaling.knative.dev/scaleDownDelay: "0s"   # Delay before scaling down
    autoscaling.knative.dev/window: "60s"          # Time window for auto-scaling decisions
spec:
  predictor:
    containers:
    - name: kserve-container
      image: sentiment-predictor:v1
      resources:
        requests:
          cpu: "200m"
          memory: "512Mi"
        limits:
          cpu: "1000m"
          memory: "1Gi"

Canary Deployments (Traffic Splitting)

Deploy a new version and gradually shift traffic:

# canary-deployment.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sentiment-predictor"
spec:
  predictor:
    canaryTrafficPercent: 10  # Send 10% traffic to new version
    containers:
    - name: kserve-container
      image: sentiment-predictor:v2  # New version
  
  # Keep old version running
  components:
    predictor:
      containers:
      - name: kserve-container
        image: sentiment-predictor:v1  # Old version gets 90% traffic

Model Transformation Pipeline

Add pre/post-processing to your model:

# transformer.py
from kserve import Model
import json
import re
from typing import Dict, Any

class SentimentTransformer(Model):
    def __init__(self, name: str, predictor_host: str):
        super().__init__(name)
        self.predictor_host = predictor_host
        
    def preprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
        """Clean and normalize text before prediction"""
        instances = payload.get("instances", [])
        
        cleaned_instances = []
        for instance in instances:
            text = instance.get("text", "")
            
            # Text cleaning pipeline
            cleaned_text = self._clean_text(text)
            
            cleaned_instances.append({"text": cleaned_text})
        
        return {"instances": cleaned_instances}
    
    def _clean_text(self, text: str) -> str:
        """Apply text cleaning transformations"""
        # Convert to lowercase
        text = text.lower()
        
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        
        # Remove special characters but keep emoticons
        text = re.sub(r'[^\w\s😊😢😭👍👎❤️💔]', '', text)
        
        # Remove URLs
        text = re.sub(r'http\S+|www\S+', '', text)
        
        return text

    def postprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
        """Add metadata and format response"""
        predictions = payload.get("predictions", [])
        
        enhanced_predictions = []
        for pred in predictions:
            # Add confidence categories
            confidence = pred.get("confidence", 0)
            
            if confidence > 0.8:
                confidence_level = "high"
            elif confidence > 0.6:
                confidence_level = "medium"
            else:
                confidence_level = "low"
            
            enhanced_pred = {
                **pred,
                "confidence_level": confidence_level,
                "model_version": "v1.0",
                "processed_at": "2024-01-01T00:00:00Z"
            }
            enhanced_predictions.append(enhanced_pred)
        
        return {"predictions": enhanced_predictions}

Multi-Model Serving

Serve multiple models in one InferenceService:

# multi-model-service.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "multi-model-predictor"
spec:
  predictor:
    containers:
    - name: kserve-container
      image: multi-model-predictor:v1
      env:
      - name: MODELS
        value: "sentiment,spam-detection,topic-classification"

6. Production Considerations

Security Best Practices

# secure-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "secure-sentiment-predictor"
spec:
  predictor:
    containers:
    - name: kserve-container
      image: sentiment-predictor:v1
      
      # Security context
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        readOnlyRootFilesystem: true
        allowPrivilegeEscalation: false
        capabilities:
          drop:
          - ALL
      
      # Resource limits to prevent DoS
      resources:
        requests:
          cpu: "100m"
          memory: "256Mi"
        limits:
          cpu: "500m"
          memory: "512Mi"
      
      # Health checks
      livenessProbe:
        httpGet:
          path: /v1/models/sentiment-model
          port: 8080
        initialDelaySeconds: 30
        periodSeconds: 10
      
      readinessProbe:
        httpGet:
          path: /v1/models/sentiment-model
          port: 8080
        initialDelaySeconds: 10
        periodSeconds: 5

Monitoring and Observability

# monitoring-config.yaml
apiVersion: v1
kind: ServiceMonitor
metadata:
  name: kserve-metrics
spec:
  selector:
    matchLabels:
      serving.kserve.io/inferenceservice: sentiment-predictor
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics

Model Storage with External Sources

# s3-model-service.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "s3-model-predictor"
spec:
  predictor:
    model:
      modelFormat:
        name: sklearn
      storageUri: "s3://my-bucket/models/sentiment-model/"
      
    # S3 credentials (use secrets in production)
    serviceAccountName: s3-access-sa

Load Testing Script

# load_test.py
import asyncio
import aiohttp
import time
import json
from typing import List
import statistics

class LoadTester:
    def __init__(self, service_url: str):
        self.service_url = f"{service_url}/v1/models/sentiment-model:predict"
        self.results = []
    
    async def send_request(self, session: aiohttp.ClientSession, text: str) -> dict:
        """Send a single prediction request"""
        payload = {"instances": [{"text": text}]}
        
        start_time = time.time()
        try:
            async with session.post(self.service_url, json=payload) as response:
                result = await response.json()
                end_time = time.time()
                
                return {
                    "success": True,
                    "latency": end_time - start_time,
                    "status_code": response.status,
                    "response": result
                }
        except Exception as e:
            end_time = time.time()
            return {
                "success": False,
                "latency": end_time - start_time,
                "error": str(e)
            }
    
    async def run_load_test(self, num_requests: int = 100, concurrency: int = 10):
        """Run load test with specified parameters"""
        print(f"🚀 Starting load test: {num_requests} requests, {concurrency} concurrent")
        
        # Create test data
        test_texts = [
            f"This is test message number {i} for load testing KServe"
            for i in range(num_requests)
        ]
        
        # Create semaphore to limit concurrency
        semaphore = asyncio.Semaphore(concurrency)
        
        async def bounded_request(session, text):
            async with semaphore:
                return await self.send_request(session, text)
        
        # Run requests
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            tasks = [bounded_request(session, text) for text in test_texts]
            results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        
        # Analyze results
        self.analyze_results(results, end_time - start_time)
    
    def analyze_results(self, results: List[dict], total_time: float):
        """Analyze and report load test results"""
        successful_results = [r for r in results if r["success"]]
        failed_results = [r for r in results if not r["success"]]
        
        if successful_results:
            latencies = [r["latency"] for r in successful_results]
            
            print("\n📊 Load Test Results")
            print("=" * 40)
            print(f"Total requests: {len(results)}")
            print(f"Successful: {len(successful_results)}")
            print(f"Failed: {len(failed_results)}")
            print(f"Success rate: {len(successful_results)/len(results)*100:.1f}%")
            print(f"Total time: {total_time:.2f}s")
            print(f"Requests/second: {len(results)/total_time:.1f}")
            print()
            print("📈 Latency Statistics:")
            print(f"  Average: {statistics.mean(latencies):.3f}s")
            print(f"  Median: {statistics.median(latencies):.3f}s")
            print(f"  Min: {min(latencies):.3f}s")
            print(f"  Max: {max(latencies):.3f}s")
            print(f"  95th percentile: {sorted(latencies)[int(0.95 * len(latencies))]:.3f}s")
        
        if failed_results:
            print("\n❌ Failed Requests:")
            for i, result in enumerate(failed_results[:5]):  # Show first 5 failures
                print(f"  {i+1}. {result['error']}")

if __name__ == "__main__":
    import os
    
    service_url = os.getenv('SERVICE_URL', 'http://localhost:8080')
    tester = LoadTester(service_url)
    
    # Run load test
    asyncio.run(tester.run_load_test(num_requests=50, concurrency=5))

7. Troubleshooting Guide

Common Issues and Solutions

Issue 1: Pod Stuck in Pending State

# Diagnosis
kubectl describe pod -l serving.kserve.io/inferenceservice=sentiment-predictor

# Common causes and solutions:
# 1. Insufficient resources
kubectl top nodes
kubectl describe nodes

# 2. Image pull issues
kubectl get events --sort-by=.metadata.creationTimestamp

# 3. Volume mount problems
kubectl describe pvc

Issue 2: Model Loading Failures

# Check model container logs
kubectl logs -l serving.kserve.io/inferenceservice=sentiment-predictor -c kserve-container

# Check init container logs (if using storage URI)
kubectl logs -l serving.kserve.io/inferenceservice=sentiment-predictor -c storage-initializer

# Debug storage access
kubectl exec -it <pod-name> -c kserve-container -- ls -la /mnt/models/

Issue 3: Scale-to-Zero Not Working

# Check KNative configuration
kubectl get configmap config-autoscaler -n knative-serving -o yaml

# Check InferenceService annotations
kubectl get inferenceservice sentiment-predictor -o yaml | grep -A 10 annotations

# Monitor scaling events
kubectl get events --field-selector involvedObject.name=sentiment-predictor

Issue 4: Network Connectivity Problems

# Test internal service connectivity
kubectl run debug --image=curlimages/curl --rm -it -- \
  curl -X POST http://sentiment-predictor-predictor-default.default.svc.cluster.local/v1/models/sentiment-model:predict \
  -H "Content-Type: application/json" \
  -d '{"instances": [{"text": "test"}]}'

# Check Istio configuration
kubectl get gateway
kubectl get virtualservice

Debugging Script

#!/bin/bash
# debug-kserve.sh

INFERENCE_SERVICE_NAME=${1:-sentiment-predictor}
NAMESPACE=${2:-default}

echo "🔍 Debugging KServe InferenceService: $INFERENCE_SERVICE_NAME"
echo "📍 Namespace: $NAMESPACE"
echo "=" * 50

# 1. Check InferenceService status
echo "📊 InferenceService Status:"
kubectl get inferenceservice $INFERENCE_SERVICE_NAME -n $NAMESPACE -o wide

echo -e "\n📝 InferenceService Details:"
kubectl describe inferenceservice $INFERENCE_SERVICE_NAME -n $NAMESPACE

# 2. Check related pods
echo -e "\n🔍 Related Pods:"
kubectl get pods -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE -o wide

# 3. Check recent events
echo -e "\n📅 Recent Events:"
kubectl get events -n $NAMESPACE --sort-by=.metadata.creationTimestamp | grep $INFERENCE_SERVICE_NAME | tail -10

# 4. Check logs
echo -e "\n📋 Pod Logs:"
PODS=$(kubectl get pods -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE -o jsonpath='{.items[*].metadata.name}')

for pod in $PODS; do
    echo -e "\n--- Logs for $pod ---"
    kubectl logs $pod -n $NAMESPACE -c kserve-container --tail=20
done

# 5. Check services
echo -e "\n🌐 Related Services:"
kubectl get services -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE

# 6. Check KNative resources
echo -e "\n⚡ KNative Resources:"
kubectl get ksvc -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE

echo -e "\n✅ Debug information collected!"

8. Best Practices

Model Development

  1. Container Best Practices

    # Use specific Python version
    FROM python:3.9-slim
    
    # Create non-root user
    RUN useradd --create-home --shell /bin/bash kserve
    USER kserve
    
    # Pin dependency versions
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    # Use multi-stage builds for smaller images
    FROM python:3.9-slim as runtime
    COPY --from=builder /app /app
    
  2. Model Versioning

    # Include model metadata
    class ModelMetadata:
        version = "1.2.3"
        training_date = "2024-01-01"
        performance_metrics = {
            "accuracy": 0.95,
            "f1_score": 0.92
        }
    
  3. Input Validation

    def validate_input(self, payload):
        if not isinstance(payload.get("instances"), list):
            raise ValueError("instances must be a list")
    
        for instance in payload["instances"]:
            if "text" not in instance:
                raise ValueError("Each instance must have 'text' field")
    
            if len(instance["text"]) > 1000:
                raise ValueError("Text length cannot exceed 1000 characters")
    

Production Deployment

  1. Resource Planning

    resources:
      requests:
        cpu: "100m"     # Start small
        memory: "256Mi"
      limits:
        cpu: "1000m"    # Allow bursting
        memory: "1Gi"
    
  2. Health Checks

    # Implement comprehensive health checks
    def health_check(self):
        try:
            # Test model prediction
            test_result = self.model.predict(["health check"])
            return {"status": "healthy", "model_loaded": True}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
    
  3. Monitoring

    import time
    import logging
    
    class MetricsMiddleware:
        def __init__(self):
            self.request_count = 0
            self.total_latency = 0
    
        def track_request(self, start_time, end_time):
            self.request_count += 1
            self.total_latency += (end_time - start_time)
    
            avg_latency = self.total_latency / self.request_count
            logging.info(f"Avg latency: {avg_latency:.3f}s")
    

Security Checklist

Performance Optimization

  1. Model Optimization

    # Use model quantization
    from sklearn.externals import joblib
    import numpy as np
    
    # Quantize model weights to reduce memory
    def quantize_model(model):
        for param in model.get_params():
            if isinstance(param, np.ndarray):
                param = param.astype(np.float16)
    
  2. Batch Processing

    def predict_batch(self, texts, batch_size=32):
        results = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            batch_results = self.model.predict(batch)
            results.extend(batch_results)
        return results
    
  3. Caching

    from functools import lru_cache
    
    @lru_cache(maxsize=1000)
    def cached_predict(self, text_hash):
        return self.model.predict([text_hash])
    

Conclusion

You’ve now learned the essential 20% of KServe that covers 80% of real-world use cases:

  1. Core Concepts: InferenceService, Predictor, Auto-scaling
  2. Practical Skills: Building, deploying, and testing ML models
  3. Production Features: Security, monitoring, troubleshooting
  4. Advanced Patterns: Canary deployments, transformers, multi-model serving

Next Steps

  1. Practice: Deploy your own model using this tutorial
  2. Explore: Try different model frameworks (TensorFlow, PyTorch, XGBoost)
  3. Scale: Implement A/B testing and canary deployments
  4. Monitor: Set up comprehensive observability
  5. Optimize: Profile and tune your model performance

Resources for Deeper Learning

Remember: Start simple, iterate quickly, and scale gradually. KServe handles the complexity so you can focus on building great ML models! 🚀