Sentiment Analysis Model on KServe
Table of Contents
- Introduction and Core Concepts
- Architecture Deep Dive
- Environment Setup
- Your First Model Deployment
- Advanced Features
- Production Considerations
- Troubleshooting Guide
- Best Practices
1. Introduction and Core Concepts
What is KServe?
KServe (formerly KNative Serving) is a serverless machine learning inference platform built on Kubernetes. Think of it as the “Netflix for ML models” - it automatically handles scaling, routing, monitoring, and serving your models with zero manual intervention.
Why KServe Matters: The Problem It Solves
Before KServe:
Developer deploys model → Manual scaling → Manual monitoring →
Manual A/B testing → Manual canary rollouts → Manual rollbacks
With KServe:
Developer deploys model → Everything else happens automatically
Core Concepts You Must Understand
1. InferenceService
The primary resource in KServe. It’s like a “smart container” that wraps your model with superpowers:
# Think of this as: "I want to serve my model with autoscaling"
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "my-awesome-model"
spec:
predictor: # The actual model serving logic
containers:
- image: my-model:latest
2. Predictor vs Transformer vs Explainer
- Predictor: The actual model (required)
- Transformer: Pre/post-processing (optional)
- Explainer: Model explanations (optional)
Input → [Transformer] → Predictor → [Explainer] → Output
3. Serverless Magic: Scale-to-Zero
KServe automatically:
- Scales UP when requests come in (cold start ~1-3 seconds)
- Scales DOWN to zero when idle (saves 💰)
- Auto-scales based on traffic patterns
2. Architecture Deep Dive
The KServe Stack
┌─────────────────────────────────────────┐
│ Traffic (HTTP/gRPC) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Istio Gateway (Ingress) │ ← Traffic routing
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ KNative Serving │ ← Serverless runtime
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Your Model Container │ ← Your code here
└─────────────────────────────────────────┘
Request Flow Explained
- Client sends HTTP request to model endpoint
- Istio Gateway receives and routes traffic
- KNative checks if pods are running:
- If cold: Spins up new pods (1-3 seconds)
- If warm: Routes to existing pods
- Your container processes the request
- Response flows back through the stack
Key Components Deep Dive
KNative Serving
- Purpose: Serverless container orchestration
- Key Features: Auto-scaling, traffic splitting, gradual rollouts
- Scale-to-zero: Pods terminate after ~30 seconds of inactivity
Istio Service Mesh
- Purpose: Traffic management and security
- Features: Load balancing, circuit breaking, retries, mTLS
- Traffic Splitting: Route 90% to v1, 10% to v2 for canary deployments
3. Environment Setup
Prerequisites Checklist
- ✅ Kubernetes cluster (1.21+)
- ✅ kubectl configured
- ✅ 4GB+ RAM available
- ✅ Docker for building images
- ✅ curl for testing
Quick Setup with Kind (Local Development)
# 1. Create Kind cluster with specific configuration
cat <<EOF > kind-config.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 8080
- containerPort: 443
hostPort: 8443
EOF
kind create cluster --config=kind-config.yaml --name=kserve-demo
Install KServe (The Right Way)
# 1. Install KNative Serving
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-core.yaml
# 2. Install Istio for KNative
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.12.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.12.0/net-istio.yaml
# 3. Install KServe
kubectl apply -f https://github.com/kserve/kserve/releases/download/v0.11.0/kserve.yaml
kubectl apply -f https://github.com/kserve/kserve/releases/download/v0.11.0/kserve-runtimes.yaml
# 4. Verify installation
kubectl get pods -n knative-serving
kubectl get pods -n kserve
Verification Script
#!/bin/bash
# verify-installation.sh
echo "🔍 Checking KServe installation..."
# Check KNative Serving
echo "📦 KNative Serving pods:"
kubectl get pods -n knative-serving --no-headers | while read line; do
status=$(echo $line | awk '{print $3}')
if [[ "$status" != "Running" ]]; then
echo "❌ $line"
exit 1
else
echo "✅ $line"
fi
done
# Check KServe
echo "📦 KServe pods:"
kubectl get pods -n kserve --no-headers | while read line; do
status=$(echo $line | awk '{print $3}')
if [[ "$status" != "Running" ]]; then
echo "❌ $line"
exit 1
else
echo "✅ $line"
fi
done
echo "🎉 Installation verified successfully!"
4. Your First Model Deployment
Step 1: Create a Simple Model
Let’s build a sentiment analysis model that everyone can understand:
# sentiment_model.py
import joblib
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
import json
# Create and train a simple sentiment model
def create_model():
# Sample training data
texts = [
"I love this product", "This is amazing", "Great quality",
"I hate this", "Terrible experience", "Very disappointed",
"It's okay", "Not bad", "Average product"
]
labels = [1, 1, 1, 0, 0, 0, 0.5, 0.5, 0.5] # 1=positive, 0=negative, 0.5=neutral
# Create pipeline
pipeline = Pipeline([
('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
('classifier', LogisticRegression())
])
# Train model
pipeline.fit(texts, labels)
return pipeline
# Create and save model
model = create_model()
joblib.dump(model, 'sentiment_model.joblib')
print("✅ Model created and saved!")
# Test the model
test_texts = ["I love KServe!", "This tutorial is confusing"]
predictions = model.predict(test_texts)
probabilities = model.predict_proba(test_texts)
for text, pred, prob in zip(test_texts, predictions, probabilities):
sentiment = "positive" if pred > 0.6 else "negative" if pred < 0.4 else "neutral"
confidence = max(prob)
print(f"Text: '{text}' → {sentiment} (confidence: {confidence:.2f})")
Step 2: Create KServe-Compatible Predictor
# predictor.py
import joblib
import json
import logging
from typing import Dict, List, Any
from kserve import Model, ModelServer
import numpy as np
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SentimentPredictor(Model):
"""
A KServe-compatible sentiment analysis predictor.
This class handles:
1. Model loading from storage
2. Input validation and preprocessing
3. Making predictions
4. Output formatting
"""
def __init__(self, name: str):
super().__init__(name)
self.name = name
self.model = None
self.ready = False
# Define sentiment labels
self.sentiment_labels = {
0: "negative",
0.5: "neutral",
1: "positive"
}
def load(self) -> bool:
"""
Load the model from storage.
KServe calls this method once when the container starts.
This is where you load your model weights, artifacts, etc.
"""
try:
logger.info(f"Loading model for {self.name}")
# Load model from the mounted storage path
# In production, this could be S3, GCS, etc.
model_path = "/mnt/models/sentiment_model.joblib"
self.model = joblib.load(model_path)
# Test the model with a simple prediction
test_prediction = self.model.predict(["test"])
logger.info(f"Model loaded successfully. Test prediction: {test_prediction}")
self.ready = True
return True
except Exception as e:
logger.error(f"Failed to load model: {str(e)}")
raise e
def predict(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
"""
Make predictions on input data.
Args:
payload: Input data in the format:
{
"instances": [
{"text": "I love this product"},
{"text": "This is terrible"}
]
}
headers: HTTP headers (optional)
Returns:
Dictionary containing predictions:
{
"predictions": [
{
"text": "I love this product",
"sentiment": "positive",
"confidence": 0.89,
"scores": {"positive": 0.89, "negative": 0.05, "neutral": 0.06}
}
]
}
"""
# Validate model is ready
if not self.ready:
raise RuntimeError("Model is not ready. Please check logs.")
# Validate input format
if "instances" not in payload:
raise ValueError("Input must contain 'instances' field")
instances = payload["instances"]
if not isinstance(instances, list):
raise ValueError("'instances' must be a list")
logger.info(f"Processing {len(instances)} instances")
try:
# Extract text from instances
texts = []
for i, instance in enumerate(instances):
if isinstance(instance, dict):
if "text" not in instance:
raise ValueError(f"Instance {i} missing 'text' field")
texts.append(instance["text"])
elif isinstance(instance, str):
texts.append(instance)
else:
raise ValueError(f"Instance {i} must be string or dict with 'text' field")
# Make predictions
predictions = self.model.predict(texts)
probabilities = self.model.predict_proba(texts)
# Format results
results = []
for text, pred, probs in zip(texts, predictions, probabilities):
# Determine sentiment based on prediction
if pred >= 0.7:
sentiment = "positive"
elif pred <= 0.3:
sentiment = "negative"
else:
sentiment = "neutral"
# Get confidence (highest probability)
confidence = float(np.max(probs))
# Create probability scores for all classes
# Note: This assumes binary classification, adjust for your model
scores = {
"negative": float(probs[0]) if len(probs) > 0 else 0.0,
"positive": float(probs[1]) if len(probs) > 1 else 0.0,
"neutral": 1.0 - (float(probs[0]) + float(probs[1])) if len(probs) > 1 else 0.0
}
result = {
"text": text,
"sentiment": sentiment,
"confidence": confidence,
"prediction_score": float(pred),
"scores": scores
}
results.append(result)
logger.info(f"Successfully processed {len(results)} predictions")
return {"predictions": results}
except Exception as e:
logger.error(f"Prediction failed: {str(e)}")
raise e
def preprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
"""
Optional: Preprocess input before prediction.
Override this method if you need custom preprocessing.
"""
return payload
def postprocess(self, result: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
"""
Optional: Postprocess prediction results.
Override this method if you need custom postprocessing.
"""
return result
if __name__ == "__main__":
# Start the model server
logger.info("Starting KServe ModelServer...")
model = SentimentPredictor("sentiment-model")
ModelServer().start([model])
Step 3: Create Dockerfile
# Dockerfile
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY predictor.py .
COPY sentiment_model.py .
# Create model directory
RUN mkdir -p /mnt/models
# Train and save model during build (for demo purposes)
# In production, you'd load from external storage
RUN python sentiment_model.py && \
cp sentiment_model.joblib /mnt/models/
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/v1/models/sentiment-model || exit 1
# Start the server
CMD ["python", "predictor.py"]
Step 4: Requirements File
# requirements.txt
kserve>=0.11.0
scikit-learn>=1.3.0
joblib>=1.3.0
numpy>=1.24.0
pandas>=2.0.0
Step 5: Build and Load Image
# Build the Docker image
docker build -t sentiment-predictor:v1 .
# Load image into Kind cluster (for local development)
kind load docker-image sentiment-predictor:v1 --name kserve-demo
# Verify image is loaded
docker exec -it kserve-demo-control-plane crictl images | grep sentiment
Step 6: Deploy to KServe
# sentiment-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sentiment-predictor"
namespace: "default"
annotations:
# Disable Istio sidecar injection for simplicity
sidecar.istio.io/inject: "false"
spec:
predictor:
# Minimum and maximum number of replicas
minReplicas: 0 # Enable scale-to-zero
maxReplicas: 5 # Maximum auto-scale limit
containers:
- name: kserve-container
image: sentiment-predictor:v1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
protocol: TCP
# Environment variables
env:
- name: MODEL_NAME
value: "sentiment-model"
- name: STORAGE_URI
value: "file:///mnt/models"
# Resource requests and limits
resources:
requests:
cpu: "100m" # 0.1 CPU cores
memory: "256Mi" # 256 MB RAM
limits:
cpu: "500m" # 0.5 CPU cores
memory: "512Mi" # 512 MB RAM
# Liveness and readiness probes
livenessProbe:
httpGet:
path: /v1/models/sentiment-model
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /v1/models/sentiment-model
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 5
failureThreshold: 3
Step 7: Deploy and Test
# Deploy the InferenceService
kubectl apply -f sentiment-inferenceservice.yaml
# Wait for deployment to be ready
kubectl wait --for=condition=Ready inferenceservice/sentiment-predictor --timeout=300s
# Check the status
kubectl get inferenceservice sentiment-predictor
kubectl describe inferenceservice sentiment-predictor
# Get the service URL
export SERVICE_URL=$(kubectl get inferenceservice sentiment-predictor -o jsonpath='{.status.url}')
echo "Service URL: $SERVICE_URL"
Step 8: Comprehensive Testing
# Test 1: Simple prediction
curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
-H "Content-Type: application/json" \
-d '{
"instances": [
{"text": "I love KServe! It makes ML deployment so easy."},
{"text": "This tutorial is confusing and poorly written."},
{"text": "The weather is okay today."}
]
}'
# Test 2: Edge cases
curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
-H "Content-Type: application/json" \
-d '{
"instances": [
{"text": ""},
{"text": "😊😊😊"},
{"text": "This is a very long text that contains multiple sentences. Some are positive, some are negative, and some are neutral. How will the model handle this complex input?"}
]
}'
# Test 3: Load testing (optional)
for i in {1..10}; do
curl -X POST $SERVICE_URL/v1/models/sentiment-model:predict \
-H "Content-Type: application/json" \
-d '{"instances": [{"text": "Load test message '$i'"}]}' &
done
wait
Python Test Client
# test_client.py
import requests
import json
import time
from typing import List, Dict
class SentimentClient:
def __init__(self, service_url: str):
self.service_url = service_url.rstrip('/')
self.predict_url = f"{self.service_url}/v1/models/sentiment-model:predict"
def predict(self, texts: List[str]) -> Dict:
"""Send prediction request to KServe endpoint"""
payload = {
"instances": [{"text": text} for text in texts]
}
try:
response = requests.post(
self.predict_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
def test_sentiment_analysis(self):
"""Run comprehensive tests"""
test_cases = [
# Positive examples
"I absolutely love this product!",
"Amazing quality and fast delivery.",
"Best purchase I've made this year.",
# Negative examples
"Terrible experience, waste of money.",
"Poor quality, broke after one day.",
"Customer service was horrible.",
# Neutral examples
"The product is okay.",
"It works as expected.",
"Average quality for the price.",
# Edge cases
"",
"😊😊😊 👍",
"This is a very long review with mixed sentiments. The product quality is excellent and I'm very satisfied with the purchase. However, the shipping was delayed and the packaging was damaged. Overall, it's a decent product but there's room for improvement in logistics."
]
print("🧪 Testing Sentiment Analysis Model")
print("=" * 50)
start_time = time.time()
result = self.predict(test_cases)
end_time = time.time()
if result:
print(f"✅ Request completed in {end_time - start_time:.2f} seconds")
print(f"📊 Processed {len(test_cases)} texts")
print("\n📋 Results:")
for i, prediction in enumerate(result["predictions"]):
text = prediction["text"]
sentiment = prediction["sentiment"]
confidence = prediction["confidence"]
# Truncate long texts for display
display_text = text[:50] + "..." if len(text) > 50 else text
print(f"{i+1:2d}. '{display_text}'")
print(f" → {sentiment} (confidence: {confidence:.2f})")
print()
else:
print("❌ Test failed")
if __name__ == "__main__":
import os
# Get service URL from environment or use default
service_url = os.getenv('SERVICE_URL', 'http://localhost:8080')
client = SentimentClient(service_url)
client.test_sentiment_analysis()
5. Advanced Features
Auto-scaling Configuration
KServe automatically scales your model based on traffic, but you can fine-tune the behavior:
# advanced-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sentiment-predictor-advanced"
annotations:
# Auto-scaling configuration
autoscaling.knative.dev/target: "10" # Target concurrent requests per pod
autoscaling.knative.dev/maxScale: "10" # Maximum number of pods
autoscaling.knative.dev/minScale: "1" # Minimum number of pods (disable scale-to-zero)
autoscaling.knative.dev/scaleToZeroGracePeriod: "30s" # Grace period before scaling to zero
autoscaling.knative.dev/scaleDownDelay: "0s" # Delay before scaling down
autoscaling.knative.dev/window: "60s" # Time window for auto-scaling decisions
spec:
predictor:
containers:
- name: kserve-container
image: sentiment-predictor:v1
resources:
requests:
cpu: "200m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
Canary Deployments (Traffic Splitting)
Deploy a new version and gradually shift traffic:
# canary-deployment.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sentiment-predictor"
spec:
predictor:
canaryTrafficPercent: 10 # Send 10% traffic to new version
containers:
- name: kserve-container
image: sentiment-predictor:v2 # New version
# Keep old version running
components:
predictor:
containers:
- name: kserve-container
image: sentiment-predictor:v1 # Old version gets 90% traffic
Model Transformation Pipeline
Add pre/post-processing to your model:
# transformer.py
from kserve import Model
import json
import re
from typing import Dict, Any
class SentimentTransformer(Model):
def __init__(self, name: str, predictor_host: str):
super().__init__(name)
self.predictor_host = predictor_host
def preprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Clean and normalize text before prediction"""
instances = payload.get("instances", [])
cleaned_instances = []
for instance in instances:
text = instance.get("text", "")
# Text cleaning pipeline
cleaned_text = self._clean_text(text)
cleaned_instances.append({"text": cleaned_text})
return {"instances": cleaned_instances}
def _clean_text(self, text: str) -> str:
"""Apply text cleaning transformations"""
# Convert to lowercase
text = text.lower()
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Remove special characters but keep emoticons
text = re.sub(r'[^\w\s😊😢😭👍👎❤️💔]', '', text)
# Remove URLs
text = re.sub(r'http\S+|www\S+', '', text)
return text
def postprocess(self, payload: Dict[str, Any], headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Add metadata and format response"""
predictions = payload.get("predictions", [])
enhanced_predictions = []
for pred in predictions:
# Add confidence categories
confidence = pred.get("confidence", 0)
if confidence > 0.8:
confidence_level = "high"
elif confidence > 0.6:
confidence_level = "medium"
else:
confidence_level = "low"
enhanced_pred = {
**pred,
"confidence_level": confidence_level,
"model_version": "v1.0",
"processed_at": "2024-01-01T00:00:00Z"
}
enhanced_predictions.append(enhanced_pred)
return {"predictions": enhanced_predictions}
Multi-Model Serving
Serve multiple models in one InferenceService:
# multi-model-service.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "multi-model-predictor"
spec:
predictor:
containers:
- name: kserve-container
image: multi-model-predictor:v1
env:
- name: MODELS
value: "sentiment,spam-detection,topic-classification"
6. Production Considerations
Security Best Practices
# secure-inferenceservice.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "secure-sentiment-predictor"
spec:
predictor:
containers:
- name: kserve-container
image: sentiment-predictor:v1
# Security context
securityContext:
runAsNonRoot: true
runAsUser: 1000
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
# Resource limits to prevent DoS
resources:
requests:
cpu: "100m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
# Health checks
livenessProbe:
httpGet:
path: /v1/models/sentiment-model
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /v1/models/sentiment-model
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
Monitoring and Observability
# monitoring-config.yaml
apiVersion: v1
kind: ServiceMonitor
metadata:
name: kserve-metrics
spec:
selector:
matchLabels:
serving.kserve.io/inferenceservice: sentiment-predictor
endpoints:
- port: metrics
interval: 30s
path: /metrics
Model Storage with External Sources
# s3-model-service.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "s3-model-predictor"
spec:
predictor:
model:
modelFormat:
name: sklearn
storageUri: "s3://my-bucket/models/sentiment-model/"
# S3 credentials (use secrets in production)
serviceAccountName: s3-access-sa
Load Testing Script
# load_test.py
import asyncio
import aiohttp
import time
import json
from typing import List
import statistics
class LoadTester:
def __init__(self, service_url: str):
self.service_url = f"{service_url}/v1/models/sentiment-model:predict"
self.results = []
async def send_request(self, session: aiohttp.ClientSession, text: str) -> dict:
"""Send a single prediction request"""
payload = {"instances": [{"text": text}]}
start_time = time.time()
try:
async with session.post(self.service_url, json=payload) as response:
result = await response.json()
end_time = time.time()
return {
"success": True,
"latency": end_time - start_time,
"status_code": response.status,
"response": result
}
except Exception as e:
end_time = time.time()
return {
"success": False,
"latency": end_time - start_time,
"error": str(e)
}
async def run_load_test(self, num_requests: int = 100, concurrency: int = 10):
"""Run load test with specified parameters"""
print(f"🚀 Starting load test: {num_requests} requests, {concurrency} concurrent")
# Create test data
test_texts = [
f"This is test message number {i} for load testing KServe"
for i in range(num_requests)
]
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrency)
async def bounded_request(session, text):
async with semaphore:
return await self.send_request(session, text)
# Run requests
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [bounded_request(session, text) for text in test_texts]
results = await asyncio.gather(*tasks)
end_time = time.time()
# Analyze results
self.analyze_results(results, end_time - start_time)
def analyze_results(self, results: List[dict], total_time: float):
"""Analyze and report load test results"""
successful_results = [r for r in results if r["success"]]
failed_results = [r for r in results if not r["success"]]
if successful_results:
latencies = [r["latency"] for r in successful_results]
print("\n📊 Load Test Results")
print("=" * 40)
print(f"Total requests: {len(results)}")
print(f"Successful: {len(successful_results)}")
print(f"Failed: {len(failed_results)}")
print(f"Success rate: {len(successful_results)/len(results)*100:.1f}%")
print(f"Total time: {total_time:.2f}s")
print(f"Requests/second: {len(results)/total_time:.1f}")
print()
print("📈 Latency Statistics:")
print(f" Average: {statistics.mean(latencies):.3f}s")
print(f" Median: {statistics.median(latencies):.3f}s")
print(f" Min: {min(latencies):.3f}s")
print(f" Max: {max(latencies):.3f}s")
print(f" 95th percentile: {sorted(latencies)[int(0.95 * len(latencies))]:.3f}s")
if failed_results:
print("\n❌ Failed Requests:")
for i, result in enumerate(failed_results[:5]): # Show first 5 failures
print(f" {i+1}. {result['error']}")
if __name__ == "__main__":
import os
service_url = os.getenv('SERVICE_URL', 'http://localhost:8080')
tester = LoadTester(service_url)
# Run load test
asyncio.run(tester.run_load_test(num_requests=50, concurrency=5))
7. Troubleshooting Guide
Common Issues and Solutions
Issue 1: Pod Stuck in Pending State
# Diagnosis
kubectl describe pod -l serving.kserve.io/inferenceservice=sentiment-predictor
# Common causes and solutions:
# 1. Insufficient resources
kubectl top nodes
kubectl describe nodes
# 2. Image pull issues
kubectl get events --sort-by=.metadata.creationTimestamp
# 3. Volume mount problems
kubectl describe pvc
Issue 2: Model Loading Failures
# Check model container logs
kubectl logs -l serving.kserve.io/inferenceservice=sentiment-predictor -c kserve-container
# Check init container logs (if using storage URI)
kubectl logs -l serving.kserve.io/inferenceservice=sentiment-predictor -c storage-initializer
# Debug storage access
kubectl exec -it <pod-name> -c kserve-container -- ls -la /mnt/models/
Issue 3: Scale-to-Zero Not Working
# Check KNative configuration
kubectl get configmap config-autoscaler -n knative-serving -o yaml
# Check InferenceService annotations
kubectl get inferenceservice sentiment-predictor -o yaml | grep -A 10 annotations
# Monitor scaling events
kubectl get events --field-selector involvedObject.name=sentiment-predictor
Issue 4: Network Connectivity Problems
# Test internal service connectivity
kubectl run debug --image=curlimages/curl --rm -it -- \
curl -X POST http://sentiment-predictor-predictor-default.default.svc.cluster.local/v1/models/sentiment-model:predict \
-H "Content-Type: application/json" \
-d '{"instances": [{"text": "test"}]}'
# Check Istio configuration
kubectl get gateway
kubectl get virtualservice
Debugging Script
#!/bin/bash
# debug-kserve.sh
INFERENCE_SERVICE_NAME=${1:-sentiment-predictor}
NAMESPACE=${2:-default}
echo "🔍 Debugging KServe InferenceService: $INFERENCE_SERVICE_NAME"
echo "📍 Namespace: $NAMESPACE"
echo "=" * 50
# 1. Check InferenceService status
echo "📊 InferenceService Status:"
kubectl get inferenceservice $INFERENCE_SERVICE_NAME -n $NAMESPACE -o wide
echo -e "\n📝 InferenceService Details:"
kubectl describe inferenceservice $INFERENCE_SERVICE_NAME -n $NAMESPACE
# 2. Check related pods
echo -e "\n🔍 Related Pods:"
kubectl get pods -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE -o wide
# 3. Check recent events
echo -e "\n📅 Recent Events:"
kubectl get events -n $NAMESPACE --sort-by=.metadata.creationTimestamp | grep $INFERENCE_SERVICE_NAME | tail -10
# 4. Check logs
echo -e "\n📋 Pod Logs:"
PODS=$(kubectl get pods -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE -o jsonpath='{.items[*].metadata.name}')
for pod in $PODS; do
echo -e "\n--- Logs for $pod ---"
kubectl logs $pod -n $NAMESPACE -c kserve-container --tail=20
done
# 5. Check services
echo -e "\n🌐 Related Services:"
kubectl get services -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE
# 6. Check KNative resources
echo -e "\n⚡ KNative Resources:"
kubectl get ksvc -l serving.kserve.io/inferenceservice=$INFERENCE_SERVICE_NAME -n $NAMESPACE
echo -e "\n✅ Debug information collected!"
8. Best Practices
Model Development
Container Best Practices
# Use specific Python version FROM python:3.9-slim # Create non-root user RUN useradd --create-home --shell /bin/bash kserve USER kserve # Pin dependency versions COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Use multi-stage builds for smaller images FROM python:3.9-slim as runtime COPY --from=builder /app /app
Model Versioning
# Include model metadata class ModelMetadata: version = "1.2.3" training_date = "2024-01-01" performance_metrics = { "accuracy": 0.95, "f1_score": 0.92 }
Input Validation
def validate_input(self, payload): if not isinstance(payload.get("instances"), list): raise ValueError("instances must be a list") for instance in payload["instances"]: if "text" not in instance: raise ValueError("Each instance must have 'text' field") if len(instance["text"]) > 1000: raise ValueError("Text length cannot exceed 1000 characters")
Production Deployment
Resource Planning
resources: requests: cpu: "100m" # Start small memory: "256Mi" limits: cpu: "1000m" # Allow bursting memory: "1Gi"
Health Checks
# Implement comprehensive health checks def health_check(self): try: # Test model prediction test_result = self.model.predict(["health check"]) return {"status": "healthy", "model_loaded": True} except Exception as e: return {"status": "unhealthy", "error": str(e)}
Monitoring
import time import logging class MetricsMiddleware: def __init__(self): self.request_count = 0 self.total_latency = 0 def track_request(self, start_time, end_time): self.request_count += 1 self.total_latency += (end_time - start_time) avg_latency = self.total_latency / self.request_count logging.info(f"Avg latency: {avg_latency:.3f}s")
Security Checklist
- ✅ Run containers as non-root user
- ✅ Use read-only root filesystem
- ✅ Set resource limits to prevent DoS
- ✅ Validate all inputs thoroughly
- ✅ Use secrets for sensitive data
- ✅ Enable network policies
- ✅ Regular security scanning of images
- ✅ Implement proper authentication/authorization
Performance Optimization
Model Optimization
# Use model quantization from sklearn.externals import joblib import numpy as np # Quantize model weights to reduce memory def quantize_model(model): for param in model.get_params(): if isinstance(param, np.ndarray): param = param.astype(np.float16)
Batch Processing
def predict_batch(self, texts, batch_size=32): results = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] batch_results = self.model.predict(batch) results.extend(batch_results) return results
Caching
from functools import lru_cache @lru_cache(maxsize=1000) def cached_predict(self, text_hash): return self.model.predict([text_hash])
Conclusion
You’ve now learned the essential 20% of KServe that covers 80% of real-world use cases:
- Core Concepts: InferenceService, Predictor, Auto-scaling
- Practical Skills: Building, deploying, and testing ML models
- Production Features: Security, monitoring, troubleshooting
- Advanced Patterns: Canary deployments, transformers, multi-model serving
Next Steps
- Practice: Deploy your own model using this tutorial
- Explore: Try different model frameworks (TensorFlow, PyTorch, XGBoost)
- Scale: Implement A/B testing and canary deployments
- Monitor: Set up comprehensive observability
- Optimize: Profile and tune your model performance
Resources for Deeper Learning
- KServe Official Documentation
- KNative Serving Concepts
- Istio Traffic Management
- Kubernetes Resource Management
Remember: Start simple, iterate quickly, and scale gradually. KServe handles the complexity so you can focus on building great ML models! 🚀