Building Conversational AI with LLMs and Agents
Appendix P: Semantic Kernel: Enterprise AI Orchestration

Enterprise Patterns: Azure OpenAI, Authentication, and Logging

Big Picture

Enterprise deployments of Semantic Kernel: Enterprise AI Orchestration require more than just working code. You need secure authentication, managed Azure OpenAI endpoints, structured logging, telemetry for cost tracking, and filters for content safety. This section covers the patterns that bridge the gap between a working prototype and a production-grade AI system, with emphasis on Azure integration, identity management, observability, and governance.

1. Azure OpenAI Integration

Azure OpenAI provides the same models as OpenAI (GPT-4o, GPT-4 Turbo, and others) but with enterprise features: private networking, data residency, content filtering, and SLA guarantees. Semantic Kernel: Enterprise AI Orchestration supports Azure OpenAI as a first-class connector.

import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion

kernel = sk.Kernel()

# Configure Azure OpenAI (note the different parameters)
kernel.add_service(
    AzureChatCompletion(
        service_id="azure-chat",
        deployment_name="gpt-4o",              # Your deployment name
        endpoint="https://my-org.openai.azure.com/",
        api_key="...",                          # Or use managed identity
        api_version="2024-06-01",
    )
)

# Usage is identical to OpenAI after this point
result = await kernel.invoke_prompt(
    prompt="Explain quantum computing in one paragraph.",
)

The key difference from the OpenAI connector is that Azure requires a deployment_name (the name you chose when deploying the model in Azure AI Studio) and an endpoint (your organization's Azure OpenAI resource URL). The api_version parameter pins you to a specific API version, which is important for reproducibility.

2. Azure Active Directory Authentication

API keys are convenient for development but risky for production. Azure OpenAI supports Azure Active Directory (Entra ID) authentication, which eliminates long-lived secrets and integrates with your organization's identity management.

from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# DefaultAzureCredential tries multiple auth methods in order:
# managed identity, VS Code, Azure CLI, environment variables
credential = DefaultAzureCredential()

# Create a token provider for the Azure OpenAI scope
token_provider = get_bearer_token_provider(
    credential,
    "https://cognitiveservices.azure.com/.default",
)

kernel.add_service(
    AzureChatCompletion(
        service_id="azure-chat",
        deployment_name="gpt-4o",
        endpoint="https://my-org.openai.azure.com/",
        ad_token_provider=token_provider,  # No API key needed
    )
)
Key Insight

DefaultAzureCredential automatically works across environments. In local development, it uses your Azure CLI login. In Azure App Service or AKS, it uses the managed identity assigned to your resource. This means the same code runs everywhere without configuration changes.

3. Kernel Filters for Governance

Filters are hooks that execute before and after every function invocation. They let you implement cross-cutting concerns such as content moderation, input validation, cost tracking, and audit logging without modifying individual functions.

from semantic_kernel.filters import (
    FunctionInvocationContext,
    PromptRenderContext,
)

# A filter that logs every function call
async def logging_filter(
    context: FunctionInvocationContext,
    next_handler,
):
    func_name = f"{context.function.plugin_name}.{context.function.name}"
    print(f"[INVOKE] {func_name} with args: {context.arguments}")

    # Call the actual function
    await next_handler(context)

    print(f"[RESULT] {func_name} returned: {context.result}")

# A filter that blocks sensitive content
async def content_safety_filter(
    context: FunctionInvocationContext,
    next_handler,
):
    # Check input for disallowed content
    input_text = str(context.arguments)
    blocked_terms = ["social security", "credit card number"]

    for term in blocked_terms:
        if term.lower() in input_text.lower():
            context.result = "I cannot process requests involving sensitive personal information."
            return  # Skip the actual function

    await next_handler(context)

# Register filters on the kernel
kernel.add_function_invocation_filter(logging_filter)
kernel.add_function_invocation_filter(content_safety_filter)
Agent created: ResearchAssistant Tools: [web_search, file_read, calculator] Model: gpt-4o Agent response: Based on my research, the top 3 trends in AI for 2025 are...

Filters form a pipeline (similar to middleware in web frameworks). Each filter calls next_handler to proceed to the next filter or the actual function. If a filter does not call next_handler, the chain is short-circuited.

4. Prompt Render Filters

Prompt render filters execute after a template is rendered but before it is sent to the LLM. They are ideal for prompt sanitization, PII redaction, and prompt injection detection.

import re

async def pii_redaction_filter(
    context: PromptRenderContext,
    next_handler,
):
    """Redact email addresses and phone numbers from prompts."""
    rendered = context.rendered_prompt

    # Redact emails
    rendered = re.sub(
        r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
        '[EMAIL REDACTED]',
        rendered,
    )
    # Redact phone numbers (US format)
    rendered = re.sub(
        r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        '[PHONE REDACTED]',
        rendered,
    )

    context.rendered_prompt = rendered
    await next_handler(context)

kernel.add_prompt_render_filter(pii_redaction_filter)
Tip

Layer your filters by responsibility: one for logging, one for safety, one for PII redaction. This keeps each filter simple, testable, and independently deployable. You can enable or disable individual filters based on the environment (for example, skip verbose logging in production).

5. Telemetry and OpenTelemetry Integration

Semantic Kernel: Enterprise AI Orchestration emits telemetry data compatible with OpenTelemetry (OTel), the industry standard for observability. You can capture traces, metrics, and logs from every kernel operation and send them to your monitoring backend.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

# Set up OpenTelemetry tracing
provider = TracerProvider()
provider.add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)
trace.set_tracer_provider(provider)

# Enable SK telemetry via environment variables
import os
os.environ["SEMANTICKERNEL_EXPERIMENTAL_GENAI_ENABLE_OTEL_DIAGNOSTICS"] = "true"

# Now every kernel.invoke() call generates OTel spans:
# - Function invocation spans (name, duration, status)
# - LLM call spans (model, tokens used, latency)
# - Memory operation spans (collection, result count)

In production, replace ConsoleSpanExporter with exporters for Azure Monitor, Datadog, Jaeger, or your preferred observability platform. The telemetry data lets you track token consumption, identify slow functions, and debug failures in multi-step plans.

6. Cost Tracking and Token Budgets

LLM API calls cost money. In enterprise settings, you need to track spending per user, per tenant, or per feature. SK's filter system makes this straightforward to implement.

from collections import defaultdict

class CostTracker:
    """Tracks token usage across all kernel invocations."""

    # Approximate costs per 1K tokens (input / output)
    PRICING = {
        "gpt-4o": (0.0025, 0.01),
        "gpt-4-turbo": (0.01, 0.03),
        "text-embedding-3-small": (0.00002, 0.0),
    }

    def __init__(self):
        self.usage = defaultdict(lambda: {"input": 0, "output": 0, "cost": 0.0})

    async def track_filter(self, context, next_handler):
        await next_handler(context)

        # Extract token usage from metadata
        metadata = context.result.metadata or {}
        usage = metadata.get("usage", {})
        model = metadata.get("model", "unknown")

        input_tokens = usage.get("prompt_tokens", 0)
        output_tokens = usage.get("completion_tokens", 0)

        self.usage[model]["input"] += input_tokens
        self.usage[model]["output"] += output_tokens

        # Calculate cost
        if model in self.PRICING:
            in_rate, out_rate = self.PRICING[model]
            cost = (input_tokens * in_rate + output_tokens * out_rate) / 1000
            self.usage[model]["cost"] += cost

    def report(self):
        for model, data in self.usage.items():
            print(f"{model}: {data['input']}in + {data['output']}out = ${data['cost']:.4f}")

tracker = CostTracker()
kernel.add_function_invocation_filter(tracker.track_filter)
Multi-agent chat: [Researcher] I found 5 relevant papers on the topic... [Analyst] Based on the papers, the key insight is... [Writer] Here is the draft summary incorporating the analysis... Final output: 1,247 words
Warning

Token counts from the API are the authoritative source of cost data. Do not estimate token counts using string length or word counts; the relationship between text length and tokens varies by model and language. Always use the values returned in the API response metadata.

7. Retry Policies and Resilience

Production AI services experience transient failures: rate limits, network timeouts, and service outages. SK's HTTP pipeline supports configurable retry policies.

from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
import httpx

# Configure retry behavior via the HTTP client
http_client = httpx.AsyncClient(
    timeout=httpx.Timeout(30.0, connect=10.0),
    limits=httpx.Limits(
        max_connections=20,
        max_keepalive_connections=10,
    ),
)

kernel.add_service(
    AzureChatCompletion(
        service_id="azure-chat",
        deployment_name="gpt-4o",
        endpoint="https://my-org.openai.azure.com/",
        ad_token_provider=token_provider,
        http_client=http_client,
    )
)

# For automatic retries on 429 (rate limit) responses,
# use a filter that catches exceptions and retries:
import asyncio

async def retry_filter(context, next_handler, max_retries=3):
    for attempt in range(max_retries):
        try:
            await next_handler(context)
            return
        except Exception as e:
            if "429" in str(e) and attempt < max_retries - 1:
                wait = 2 ** attempt  # Exponential backoff
                await asyncio.sleep(wait)
            else:
                raise

kernel.add_function_invocation_filter(retry_filter)

8. Putting It Together: Enterprise-Ready Kernel

The following example combines all the patterns from this section into a factory function that produces a production-ready kernel.

def create_enterprise_kernel(config) -> sk.Kernel:
    """Create a fully configured enterprise kernel."""
    kernel = sk.Kernel()

    # 1. Azure AD authentication
    credential = DefaultAzureCredential()
    token_provider = get_bearer_token_provider(
        credential, "https://cognitiveservices.azure.com/.default"
    )

    # 2. Azure OpenAI service
    kernel.add_service(
        AzureChatCompletion(
            service_id="chat",
            deployment_name=config.deployment_name,
            endpoint=config.azure_endpoint,
            ad_token_provider=token_provider,
        )
    )

    # 3. Register plugins
    kernel.add_plugin(MathPlugin(), plugin_name="Math")
    kernel.add_plugin(
        WeatherPlugin(api_key=config.weather_key),
        plugin_name="Weather",
    )

    # 4. Add governance filters
    tracker = CostTracker()
    kernel.add_function_invocation_filter(logging_filter)
    kernel.add_function_invocation_filter(content_safety_filter)
    kernel.add_function_invocation_filter(tracker.track_filter)
    kernel.add_prompt_render_filter(pii_redaction_filter)

    return kernel

This factory function produces a kernel with managed identity authentication, cost tracking, content safety, PII redaction, and structured logging. Each concern is modular and independently testable. In a web application, you would call this factory once at startup and share the kernel across request handlers, or create a new kernel per request scope if you need per-request configuration.