Enterprise deployments of Semantic Kernel: Enterprise AI Orchestration require more than just working code. You need secure authentication, managed Azure OpenAI endpoints, structured logging, telemetry for cost tracking, and filters for content safety. This section covers the patterns that bridge the gap between a working prototype and a production-grade AI system, with emphasis on Azure integration, identity management, observability, and governance.
1. Azure OpenAI Integration
Azure OpenAI provides the same models as OpenAI (GPT-4o, GPT-4 Turbo, and others) but with enterprise features: private networking, data residency, content filtering, and SLA guarantees. Semantic Kernel: Enterprise AI Orchestration supports Azure OpenAI as a first-class connector.
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
kernel = sk.Kernel()
# Configure Azure OpenAI (note the different parameters)
kernel.add_service(
AzureChatCompletion(
service_id="azure-chat",
deployment_name="gpt-4o", # Your deployment name
endpoint="https://my-org.openai.azure.com/",
api_key="...", # Or use managed identity
api_version="2024-06-01",
)
)
# Usage is identical to OpenAI after this point
result = await kernel.invoke_prompt(
prompt="Explain quantum computing in one paragraph.",
)
The key difference from the OpenAI connector is that Azure requires a deployment_name (the name you chose when deploying the model in Azure AI Studio) and an endpoint (your organization's Azure OpenAI resource URL). The api_version parameter pins you to a specific API version, which is important for reproducibility.
2. Azure Active Directory Authentication
API keys are convenient for development but risky for production. Azure OpenAI supports Azure Active Directory (Entra ID) authentication, which eliminates long-lived secrets and integrates with your organization's identity management.
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
# DefaultAzureCredential tries multiple auth methods in order:
# managed identity, VS Code, Azure CLI, environment variables
credential = DefaultAzureCredential()
# Create a token provider for the Azure OpenAI scope
token_provider = get_bearer_token_provider(
credential,
"https://cognitiveservices.azure.com/.default",
)
kernel.add_service(
AzureChatCompletion(
service_id="azure-chat",
deployment_name="gpt-4o",
endpoint="https://my-org.openai.azure.com/",
ad_token_provider=token_provider, # No API key needed
)
)
DefaultAzureCredential automatically works across environments. In local development, it uses your Azure CLI login. In Azure App Service or AKS, it uses the managed identity assigned to your resource. This means the same code runs everywhere without configuration changes.
3. Kernel Filters for Governance
Filters are hooks that execute before and after every function invocation. They let you implement cross-cutting concerns such as content moderation, input validation, cost tracking, and audit logging without modifying individual functions.
from semantic_kernel.filters import (
FunctionInvocationContext,
PromptRenderContext,
)
# A filter that logs every function call
async def logging_filter(
context: FunctionInvocationContext,
next_handler,
):
func_name = f"{context.function.plugin_name}.{context.function.name}"
print(f"[INVOKE] {func_name} with args: {context.arguments}")
# Call the actual function
await next_handler(context)
print(f"[RESULT] {func_name} returned: {context.result}")
# A filter that blocks sensitive content
async def content_safety_filter(
context: FunctionInvocationContext,
next_handler,
):
# Check input for disallowed content
input_text = str(context.arguments)
blocked_terms = ["social security", "credit card number"]
for term in blocked_terms:
if term.lower() in input_text.lower():
context.result = "I cannot process requests involving sensitive personal information."
return # Skip the actual function
await next_handler(context)
# Register filters on the kernel
kernel.add_function_invocation_filter(logging_filter)
kernel.add_function_invocation_filter(content_safety_filter)
Filters form a pipeline (similar to middleware in web frameworks). Each filter calls next_handler to proceed to the next filter or the actual function. If a filter does not call next_handler, the chain is short-circuited.
4. Prompt Render Filters
Prompt render filters execute after a template is rendered but before it is sent to the LLM. They are ideal for prompt sanitization, PII redaction, and prompt injection detection.
import re
async def pii_redaction_filter(
context: PromptRenderContext,
next_handler,
):
"""Redact email addresses and phone numbers from prompts."""
rendered = context.rendered_prompt
# Redact emails
rendered = re.sub(
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'[EMAIL REDACTED]',
rendered,
)
# Redact phone numbers (US format)
rendered = re.sub(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'[PHONE REDACTED]',
rendered,
)
context.rendered_prompt = rendered
await next_handler(context)
kernel.add_prompt_render_filter(pii_redaction_filter)
Layer your filters by responsibility: one for logging, one for safety, one for PII redaction. This keeps each filter simple, testable, and independently deployable. You can enable or disable individual filters based on the environment (for example, skip verbose logging in production).
5. Telemetry and OpenTelemetry Integration
Semantic Kernel: Enterprise AI Orchestration emits telemetry data compatible with OpenTelemetry (OTel), the industry standard for observability. You can capture traces, metrics, and logs from every kernel operation and send them to your monitoring backend.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# Set up OpenTelemetry tracing
provider = TracerProvider()
provider.add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
trace.set_tracer_provider(provider)
# Enable SK telemetry via environment variables
import os
os.environ["SEMANTICKERNEL_EXPERIMENTAL_GENAI_ENABLE_OTEL_DIAGNOSTICS"] = "true"
# Now every kernel.invoke() call generates OTel spans:
# - Function invocation spans (name, duration, status)
# - LLM call spans (model, tokens used, latency)
# - Memory operation spans (collection, result count)
In production, replace ConsoleSpanExporter with exporters for Azure Monitor, Datadog, Jaeger, or your preferred observability platform. The telemetry data lets you track token consumption, identify slow functions, and debug failures in multi-step plans.
6. Cost Tracking and Token Budgets
LLM API calls cost money. In enterprise settings, you need to track spending per user, per tenant, or per feature. SK's filter system makes this straightforward to implement.
from collections import defaultdict
class CostTracker:
"""Tracks token usage across all kernel invocations."""
# Approximate costs per 1K tokens (input / output)
PRICING = {
"gpt-4o": (0.0025, 0.01),
"gpt-4-turbo": (0.01, 0.03),
"text-embedding-3-small": (0.00002, 0.0),
}
def __init__(self):
self.usage = defaultdict(lambda: {"input": 0, "output": 0, "cost": 0.0})
async def track_filter(self, context, next_handler):
await next_handler(context)
# Extract token usage from metadata
metadata = context.result.metadata or {}
usage = metadata.get("usage", {})
model = metadata.get("model", "unknown")
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
self.usage[model]["input"] += input_tokens
self.usage[model]["output"] += output_tokens
# Calculate cost
if model in self.PRICING:
in_rate, out_rate = self.PRICING[model]
cost = (input_tokens * in_rate + output_tokens * out_rate) / 1000
self.usage[model]["cost"] += cost
def report(self):
for model, data in self.usage.items():
print(f"{model}: {data['input']}in + {data['output']}out = ${data['cost']:.4f}")
tracker = CostTracker()
kernel.add_function_invocation_filter(tracker.track_filter)
Token counts from the API are the authoritative source of cost data. Do not estimate token counts using string length or word counts; the relationship between text length and tokens varies by model and language. Always use the values returned in the API response metadata.
7. Retry Policies and Resilience
Production AI services experience transient failures: rate limits, network timeouts, and service outages. SK's HTTP pipeline supports configurable retry policies.
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
import httpx
# Configure retry behavior via the HTTP client
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(
max_connections=20,
max_keepalive_connections=10,
),
)
kernel.add_service(
AzureChatCompletion(
service_id="azure-chat",
deployment_name="gpt-4o",
endpoint="https://my-org.openai.azure.com/",
ad_token_provider=token_provider,
http_client=http_client,
)
)
# For automatic retries on 429 (rate limit) responses,
# use a filter that catches exceptions and retries:
import asyncio
async def retry_filter(context, next_handler, max_retries=3):
for attempt in range(max_retries):
try:
await next_handler(context)
return
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
wait = 2 ** attempt # Exponential backoff
await asyncio.sleep(wait)
else:
raise
kernel.add_function_invocation_filter(retry_filter)
8. Putting It Together: Enterprise-Ready Kernel
The following example combines all the patterns from this section into a factory function that produces a production-ready kernel.
def create_enterprise_kernel(config) -> sk.Kernel:
"""Create a fully configured enterprise kernel."""
kernel = sk.Kernel()
# 1. Azure AD authentication
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
credential, "https://cognitiveservices.azure.com/.default"
)
# 2. Azure OpenAI service
kernel.add_service(
AzureChatCompletion(
service_id="chat",
deployment_name=config.deployment_name,
endpoint=config.azure_endpoint,
ad_token_provider=token_provider,
)
)
# 3. Register plugins
kernel.add_plugin(MathPlugin(), plugin_name="Math")
kernel.add_plugin(
WeatherPlugin(api_key=config.weather_key),
plugin_name="Weather",
)
# 4. Add governance filters
tracker = CostTracker()
kernel.add_function_invocation_filter(logging_filter)
kernel.add_function_invocation_filter(content_safety_filter)
kernel.add_function_invocation_filter(tracker.track_filter)
kernel.add_prompt_render_filter(pii_redaction_filter)
return kernel
This factory function produces a kernel with managed identity authentication, cost tracking, content safety, PII redaction, and structured logging. Each concern is modular and independently testable. In a web application, you would call this factory once at startup and share the kernel across request handlers, or create a new kernel per request scope if you need per-request configuration.