Skip to content

Agent Identity Quickstart

Welcome to the Agent Identity Quickstart!

This guide walks you through establishing AI agent identities with Asgardeo, authenticating agents with their credentials, and integrating them seamlessly with secure MCP servers using Python over modern agent frameworks and cutting-edge AI models.

For the demonstration, let's build a math-capable agent that will interpret conversational queries and invoke specific arithmetic tools exposed by a secured Model Context Protocol (MCP) server to deliver precise calculations.

By the end of this guide, you will have:

  • An AI agent that authenticates using Agent Credentials, obtains a token to access a secure MCP Server(AI agent acting on its own)
  • An AI agent that gets authorization delegated by a user to access a secured MCP Server (Agent acting On-Behalf-Of (OBO) a user)
  • A clear understanding of both authentication scenarios described in Agent Authentication Guide

You do not need prior agent development experience. Everything you need is explained as you go.

[//] STEPS_START

Register an AI agent

To establish an identity for your AI agent, begin by registering it in Asgardeo.

  • Sign in to Asgardeo console and go to Agents.
  • Click + New Agent.
  • Provide:
  • Name: A descriptive name for your AI agent for human-readable display purposes
  • Description (optional): Purpose and functionality of the agent

Example

Name: Math Assistant Agent

Description: An AI agent that invokes protected MCP tools to answer math-related questions.

  • Click Create to complete the registration.

After successful registration, your agent will receive a unique Agent ID and an Agent Secret, which is shown only once. Make sure to store them securely, as you’ll need them later in this guide.

Configure an Application in Asgardeo

To allow your agent (or user acting through the agent) to authenticate and connect to a secure MCP server, a MCP Client needs to be set up in Asgardeo.

  • In Asgardeo console, navigate to Applications > New Application.
  • Select MCP Client Application and complete the wizard pop-up by providing a suitable name and an authorized redirect URL.

Example

Name: AgentAuthenticatorApp

Authorized redirect URL: http://localhost:6274/oauth/callback

Info

The authorized redirect URL defines the location Asgardeo sends users to after a successful login, typically the address of the client application that connects to the MCP server. In this guide, the AI agent behaves as the client, which consists of a lightweight OAuth 2.1 callback server running at http://localhost:6274/oauth/callback to capture the authorization code. So, we will use this URL as the authorized redirect for this guide.

Make a note of the client-id from the Protocol tab of the registered application. You will need it during the Build an AI Agent section of this guide.

Run the MCP Server

Your AI agent will call an MCP tool hosted on a secure MCP server. You can:

Build an AI Agent

Create a directory called agent-auth-quickstart by running the following commands.

  mkdir agent-auth-quickstart
  cd agent-auth-quickstart

Then set up and activate a Python virtual environment using the following commands.

python3 -m venv .venv
source .venv/bin/activate
python -m venv .venv
.venv\Scripts\activate

Pick your agent development framework and install the corresponding dependencies.

pip install asgardeo asgardeo_ai langchain langchain-google-genai langchain-mcp-adapters python-dotenv
pip install asgardeo asgardeo_ai python-dotenv google-adk==1.20.0 google-genai==1.54.0
pip install asgardeo asgardeo_ai crewai google-genai==1.54.0 python-dotenv
pip install asgardeo asgardeo_ai python-dotenv vercel-ai-sdk==0.0.1.dev4

Create main.py that implements an AI agent which first obtains a valid access token from Asgardeo by authenticating itself. The agent then includes that token in the Authorization header (for example Authorization: Bearer <token>) when calling the MCP tool.

main.py
import os
import asyncio

from dotenv import load_dotenv
from pathlib import Path

from asgardeo import AsgardeoConfig, AsgardeoNativeAuthClient
from asgardeo_ai import AgentConfig, AgentAuthManager

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI


# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
    base_url=os.getenv("ASGARDEO_BASE_URL"),
    client_id=os.getenv("CLIENT_ID"),
    redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
    agent_id=os.getenv("AGENT_ID"),
    agent_secret=os.getenv("AGENT_SECRET")
)


async def main():

    # Scenario 1: AI agent acting on its own using its own credentials to authenticate
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        # Get agent token
        agent_token = await auth_manager.get_agent_token(["openid"])


    # Connect to MCP Server with Authorization Header
    client = MultiServerMCPClient(
        {
            "mcp_server": {
                "transport": "streamable_http",
                "url": os.getenv("MCP_SERVER_URL"),
                "headers": {
                    "Authorization": f"Bearer {agent_token.access_token}"
                }
            }
        }
    )

    # LLM (Gemini) + LangChain Agent
    llm = ChatGoogleGenerativeAI(
        model="gemini-2.0-flash",
        temperature=0.9
    )

    tools = await client.get_tools()
    agent = create_agent(llm, tools)

    user_input = input("Enter your question: ")

    # Invoke the agent
    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": user_input}]}
    )

    print("Agent Response:", response["messages"][-1].content)


# Run app
if __name__ == "__main__":
    asyncio.run(main())
main.py
import os
import asyncio
from pathlib import Path
from dotenv import load_dotenv

from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import InMemoryRunner
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams
from google.genai import types

# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
    base_url=os.getenv("ASGARDEO_BASE_URL"),
    client_id=os.getenv("CLIENT_ID"),
    redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
    agent_id=os.getenv("AGENT_ID"),
    agent_secret=os.getenv("AGENT_SECRET")
)

async def build_toolset():
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        # Get agent token
        agent_token = await auth_manager.get_agent_token(["openid"])

    # Connect to MCP Server with Auth Header
    return McpToolset(
        connection_params=StreamableHTTPConnectionParams(
            url= os.getenv("MCP_SERVER_URL"),
            headers={"Authorization": f"Bearer {agent_token.access_token}"}
        )
    )

async def main():

    mcp_toolset = await build_toolset()

    # Define LLM Agent (Gemini)
    agent = LlmAgent(
        model="gemini-2.0-flash",
        name="add_agent",
        description="Adds two numbers using an MCP server.",
        instruction="When the user asks to add numbers, call the MCP tool `add(a, b)`.",
        tools=[mcp_toolset],
    )

    # Setup runner + session
    runner = InMemoryRunner(agent, app_name="add_numbers_app")

    session = await runner.session_service.create_session(
        app_name="add_numbers_app",
        user_id="user"
    )

    question = input("Enter your question: ")

    try:
        async for event in runner.run_async(
                user_id="user",
                session_id=session.id,
                new_message=types.Content(
                    role="user",
                    parts=[types.Part(text=question)]
                ),
        ):
            if event.content and event.content.parts:
                text = event.content.parts[0].text
                if text:
                    print(text)

    finally:
        await mcp_toolset.close()
        await runner.close()

if __name__ == "__main__":
    asyncio.run(main())
main.py
import os
import asyncio
from pathlib import Path
from dotenv import load_dotenv

from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

from crewai import Agent, Task, Crew
from crewai.mcp import MCPServerHTTP

# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
    base_url=os.getenv("ASGARDEO_BASE_URL"),
    client_id=os.getenv("CLIENT_ID"),
    redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
    agent_id=os.getenv("AGENT_ID"),
    agent_secret=os.getenv("AGENT_SECRET")
)

async def get_agent_token():
    # Asynchronously fetches the agent token from Asgardeo.
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        return await auth_manager.get_agent_token(["openid"])

def main():
    agent_token = asyncio.run(get_agent_token())

    while True:
        question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")

        if question.lower() == "exit":
            print("Exiting the program. Goodbye!")
            break
        mcp_server = MCPServerHTTP(
            url=os.getenv("MCP_SERVER_URL"),
            headers={"Authorization": f"Bearer {agent_token.access_token}"},
            streamable=True
        )

        agent = Agent(
            role="Calculation Specialist",
            goal="Add two numbers accurately using an MCP server.",
            backstory="You are an intelligent agent that strictly uses the provided MCP tool 'add(a, b)' to compute the addition of numbers when requested by a user.",
            mcps=[mcp_server],
            llm=os.getenv("MODEL_NAME"), # Use 'gemini/gemini-1.5-flash' or similar
            verbose=False
        )

        task = Task(
            description=f"Address the user's request: '{question}'",
            expected_output="The exact calculated sum of the numbers based on the MCP tool execution.",
            agent=agent
        )

        crew = Crew(
            agents=[agent],
            tasks=[task]
        )

        result = crew.kickoff()

        print("\nAgent Response:", result.raw)

if __name__ == "__main__":
    main()
main.py
import os
import asyncio

from dotenv import load_dotenv
from pathlib import Path

from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

import vercel_ai_sdk as ai

# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)

async def my_agent(llm, messages, auth_token):

    tools = await ai.mcp.get_http_tools(
        os.getenv("MCP_SERVER_URL"),
        headers={
            "Authorization": f"Bearer {auth_token}"
        }
    )

    return await ai.stream_loop(llm, messages, tools=tools)


async def main():
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        agent_token = await auth_manager.get_agent_token(["openid"])

        google_key = os.getenv("GOOGLE_API_KEY", "")
        os.environ["OPENAI_API_KEY"] = google_key
        os.environ["OPENAI_BASE_URL"] = "https://generativelanguage.googleapis.com/v1beta/openai/"

        llm = ai.openai.OpenAIModel(
            model=os.getenv("MODEL_NAME")
        )

        while True:
            user_input = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
            messages = ai.make_messages(user=user_input)

            # Exit the loop if the user types "exit"
            if user_input.lower() == "exit":
                print("Exiting the program. Goodbye!")
                break
            result = ai.run(my_agent, llm, messages, agent_token.access_token)

            print("\nAgent Response: ", end="")

            async for msg in result:
                if getattr(msg, "text_delta", None):
                    print(msg.text_delta, end="", flush=True)

            print()

if __name__ == "__main__":
asyncio.run(main())

Add environment configuration by creating a .env file at the project root to hold the Asgardeo configuration:

.env
# Asgardeo OAuth2 Configuration
ASGARDEO_BASE_URL=https://api.asgardeo.io/t/<your-tenant>
CLIENT_ID=<your-client-id>
REDIRECT_URI=http://localhost:6274/oauth/callback

# Asgardeo Agent Credentials
AGENT_ID=<agent_id>
AGENT_SECRET=<agent_secret>

# Google Gemini API Key
GOOGLE_API_KEY=<google_api_key>

# MCP Server URL
MCP_SERVER_URL=<mcp_server_url>

# LLM model used by the agent (any supported model can be used).
MODEL_NAME="gemini-2.5-flash"

Important

  • Replace <your-tenant>, <your-client-id>and the redirect URL with the values obtained from the Asgardeo console. The tenant name is visible in the console URL path (e.g., https://console.asgardeo.io/t/<your-tenant>), and the client ID can be found in the application's Protocol tab.

  • Add the Agent ID and Agent Secret from the Agent Registration step.

  • You’ll need a Google API key to use Gemini as your model. You can generate one from Google AI Studio

  • Replace <mcp_server_url> with your MCP server’s URL. If you followed the MCP Auth Server quickstart, you can use: http://127.0.0.1:8000/mcp

Project Structure

Your project folder should now look like this:

├── main.py              # Your AI Agent
└── .env                 # Your Asgardeo configs

Run and Test with Authentication

Start your AI Agent by running the following command.

  python main.py

If authentication succeeds, your agent will prompt you for a question and securely invoke the MCP tool.

Enter your question: Can you add twenty two and twelve?
Agent Response: The sum of twenty two and twelve is 34.

If authentication fails, the MCP server will return:

httpx.HTTPStatusError: Client error '401 Unauthorized'

To test the setup without authentication, simply remove the Authorization header from your client configuration, as shown below:

...
client = MultiServerMCPClient(
    {
        "mcp_server": {
            "transport": "streamable_http",
            "url": os.getenv("MCP_SERVER_URL")
        }
    }
)
...
...
return McpToolset(
        connection_params=StreamableHTTPConnectionParams(
            url= os.getenv("MCP_SERVER_URL")
        )
    )
...
...
mcp_server = MCPServerHTTP(
            url=os.getenv("MCP_SERVER_URL"),
            streamable=True
        )
...
...
tools = await ai.mcp.get_http_tools(
os.getenv("MCP_SERVER_URL"),
    )
...

Test the On-Behalf-Of (OBO) Flow

In the previous step, the AI agent authenticated itself using its own credentials. Now, let’s look at the scenario where the agent authenticates on behalf of a user.

This flow uses:

  • Authorization code issued after the user logs in
  • PKCE (Proof Key for Code Exchange) to ensure only your agent can securely exchange the authorization code for the OBO token
  • A final token exchange that produces an OBO token, representing the user

Your AI agent will then call the MCP server as the authenticated user.

During the OBO flow, Asgardeo redirects back to your client application with an authorization code after the user logs in. To handle this, create a file named oauth_callback.py with the following implementation at the project root. This lightweight HTTP server listens for the redirect and captures authorization_code and state.

Expand to view the implementation of `oauth_callback.py`
oauth_callback.py
import http.server
import socketserver
import threading
import asyncio
from urllib.parse import urlparse, parse_qs

class OAuthCallbackServer:
    def __init__(self, port: int = 6274, timeout: int = 120):
        self.port = port
        self.timeout = timeout
        self.auth_code = None
        self.state = None
        self._error = None
        self._httpd = None

    class _Handler(http.server.SimpleHTTPRequestHandler):
        parent = None

        def do_GET(self):
            url = urlparse(self.path)
            params = parse_qs(url.query)

            # OAuth error case
            if "error" in params:
                self.parent._error = params["error"][0]
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Authorization cancelled or failed. You can close this window.")
                return

            # Success case
            if "code" in params:
                self.parent.auth_code = params["code"][0]
                self.parent.state = params.get("state", [None])[0]
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Authentication successful. You can close this window.")
                return

            # Invalid callback
            if url.path != "/oauth/callback":
                self.parent._error = "Invalid Callback URL"
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Invalid redirect. You can close this window.")
                return

    def start(self):
        handler = self._Handler
        handler.parent = self

        self._httpd = socketserver.TCPServer(("localhost", self.port), handler)
        thread = threading.Thread(target=self._httpd.serve_forever)
        thread.daemon = True
        thread.start()

    def stop(self):
        if self._httpd:
            self._httpd.shutdown()

    async def wait_for_code(self):
        """Returns (auth_code, state). auth_code==None means canceled, error, or timed out."""
        elapsed = 0
        while self.auth_code is None and self._error is None and elapsed < self.timeout:
            await asyncio.sleep(0.1)
            elapsed += 0.1

        return (self.auth_code, self.state, self._error)

Then, update the main.py to perform the OBO Flow. This will:

  • Authenticate the agent
  • Generate an authorization URL for the user
  • Capture the authorization code
  • Exchange the code + agent token for an OBO token
  • Call the MCP server using the OBO token

Here is the updated implementation:

main.py
import os
import webbrowser
import asyncio

from dotenv import load_dotenv
from pathlib import Path

from asgardeo import AsgardeoConfig, AsgardeoNativeAuthClient
from asgardeo_ai import AgentConfig, AgentAuthManager

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI

from oauth_callback import OAuthCallbackServer


# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
    base_url=os.getenv("ASGARDEO_BASE_URL"),
    client_id=os.getenv("CLIENT_ID"),
    redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
    agent_id=os.getenv("AGENT_ID"),
    agent_secret=os.getenv("AGENT_SECRET")
)


async def main():

    # Perform OBO flow (authenticating on behalf of the user)
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        # Get agent token
        agent_token = await auth_manager.get_agent_token(["openid"])

        # Generate user authorization URL
        auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid"])

        callback = OAuthCallbackServer(port=6274)
        callback.start()

        print(f"\nOpening browser for authentication...")
        webbrowser.open(auth_url)

        # Wait for redirect
        auth_code, returned_state, error = await callback.wait_for_code()
        callback.stop()

        if auth_code is None:
            print(f"Authorization failed or cancelled. Error: {error}")
            return

        print(f"Received auth_code={auth_code}")

        # Exchange auth code for user token (OBO flow)
        obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)


    # Connect to MCP Server with Authorization Header
    client = MultiServerMCPClient(
        {
            "mcp_server": {
                "transport": "streamable_http",
                "url": "<mcp_server_url>",
                "headers": {
                    "Authorization": f"Bearer {obo_token.access_token}",
                }
            }
        }
    )

    # LLM (Gemini) + LangChain Agent
    llm = ChatGoogleGenerativeAI(
        model="gemini-2.0-flash",
        temperature=0.9
    )

    tools = await client.get_tools()
    agent = create_agent(llm, tools)

    user_input = input("Enter your question: ")

    # Invoke the agent
    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": user_input}]}
    )

    print("Agent Response:", response["messages"][-1].content)


# Run app
if __name__ == "__main__":
    asyncio.run(main())
main.py
import os
import webbrowser
import asyncio
from pathlib import Path
from dotenv import load_dotenv

from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import InMemoryRunner
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams
from google.genai import types

from oauth_callback import OAuthCallbackServer

# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
    base_url=os.getenv("ASGARDEO_BASE_URL"),
    client_id=os.getenv("CLIENT_ID"),
    redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
    agent_id=os.getenv("AGENT_ID"),
    agent_secret=os.getenv("AGENT_SECRET")
)

# Perform OBO flow (authenticating on behalf of the user)
async def build_toolset():
    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        # Get agent token
        agent_token = await auth_manager.get_agent_token(["openid"])

        # Generate user authorization URL
        auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid"])

        callback = OAuthCallbackServer(port=6274)
        callback.start()

        print(f"\nOpening browser for authentication...")
        webbrowser.open(auth_url)

        # Wait for redirect
        auth_code, returned_state, error = await callback.wait_for_code()
        callback.stop()

        if auth_code is None:
            print(f"Authorization failed or cancelled. Error: {error}")
            return None

        print(f"Received auth_code={auth_code}")

        # Exchange auth code for user token (OBO flow)
        obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)

    # Connect to MCP Server with Auth Header
    return McpToolset(
        connection_params=StreamableHTTPConnectionParams(
            url= "<mcp_server_url>",
            headers={"Authorization": f"Bearer {obo_token.access_token}"}
        )
    )

async def main():

    mcp_toolset = await build_toolset()

    if mcp_toolset is None:
        return

    # Define LLM Agent (Gemini)
    agent = LlmAgent(
        model="gemini-2.0-flash",
        name="add_agent",
        description="Adds two numbers using an MCP server.",
        instruction="When the user asks to add numbers, call the MCP tool `add(a, b)`.",
        tools=[mcp_toolset],
    )

    # Setup runner + session
    runner = InMemoryRunner(agent, app_name="add_numbers_app")

    session = await runner.session_service.create_session(
        app_name="add_numbers_app",
        user_id="user"
    )

    question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")

    try:
        async for event in runner.run_async(
                user_id="user",
                session_id=session.id,
                new_message=types.Content(
                    role="user",
                    parts=[types.Part(text=question)]
                ),
        ):
            if event.content and event.content.parts:
                text = event.content.parts[0].text
                if text:
                    print(text)

    finally:
        await mcp_toolset.close()
        await runner.close()

if __name__ == "__main__":
    asyncio.run(main())
main.py
import os
import asyncio
import sys
import webbrowser
from pathlib import Path
from dotenv import load_dotenv

from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from crewai.mcp import MCPServerHTTP

from pydantic import Field

# Asgardeo / Identity imports
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

from oauth_callback import OAuthCallbackServer

import warnings

# Suppress RuntimeWarning related to coroutine not awaited
warnings.filterwarnings("ignore", category=RuntimeWarning, message="coroutine '.*' was never awaited")

# Load environment variables from .env file
load_dotenv()

async def get_obo_token():
    # Handles the OAuth/OBO flow to get the user token.
    ASGARDEO_CONFIG = AsgardeoConfig(
        base_url=os.getenv("ASGARDEO_BASE_URL"),
        client_id=os.getenv("CLIENT_ID"),
        redirect_uri=os.getenv("REDIRECT_URI")
    )
    AGENT_CONFIG = AgentConfig(
        agent_id=os.getenv("AGENT_ID"),
        agent_secret=os.getenv("AGENT_SECRET")
    )

    async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
        agent_token = await auth_manager.get_agent_token(["openid", "email"])
        auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid", "email"])

        callback = OAuthCallbackServer(port=6274)
        callback.start()

        print(f"\nOpening browser for authentication...")
        webbrowser.open(auth_url)

        auth_code, _, error = await callback.wait_for_code()
        callback.stop()

        if not auth_code:
            raise Exception(f"Auth failed: {error}")

        obo_token = await auth_manager.get_obo_token(
            auth_code,
            agent_token=agent_token,
            code_verifier=code_verifier
        )
        return obo_token.access_token

# --- Main CrewAI Execution ---

async def main():
    # 1. Get the Token via OBO Flow
    try:
        access_token = await get_obo_token()
        print("Successfully obtained OBO Token.")
    except Exception as e:
        print(f"Failed to authenticate: {e}")
        return

    while True:
        # 2. Get the user question
        question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")

        # Exit the loop if the user types "exit"
        if question.lower() == "exit":
            print("Exiting the program. Goodbye!")
            break
        # 3. Configure the MCP server for CrewAI
        # We map StreamableHTTPConnectionParams directly to MCPServerHTTP
        mcp_server = MCPServerHTTP(
            url=os.getenv("MCP_SERVER_URL"),
            headers={"Authorization": f"Bearer {access_token}"},
            streamable=True
        )

        # 4. Define the CrewAI Agent
        agent = Agent(
            role="Calculation Specialist",
            goal="Add two numbers accurately using an MCP server.",
            backstory="You are an intelligent agent that strictly uses the provided MCP tool 'add(a, b)' to compute the addition of numbers when requested by a user.",
            mcps=[mcp_server],
            llm=os.getenv("MODEL_NAME"), # Use 'gemini/gemini-1.5-flash' or similar
            verbose=False
        )

        # 5. Define the Task
        task = Task(
            description=f"Address the user's request: '{question}'",
            expected_output="The exact calculated sum of the numbers based on the MCP tool execution.",
            agent=agent
        )

        # 6. Setup and run the Crew
        crew = Crew(
            agents=[agent],
            tasks=[task]
        )

        result = crew.kickoff()

        print("\nAgent Response:", result.raw)

if __name__ == "__main__":
    asyncio.run(main())
main.py
import os
import asyncio
import sys
import webbrowser

import vercel_ai_sdk as ai

from dotenv import load_dotenv
from pathlib import Path

from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager

from oauth_callback import OAuthCallbackServer

# Load environment variables from .env file
load_dotenv()

ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)

AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)

# 1. Define the agent logic (no decorators needed)
async def my_agent(llm, messages, auth_token):

    # Connect to MCP Server using the user's OBO token
    tools = await ai.mcp.get_http_tools(
        os.getenv("MCP_SERVER_URL"),
        headers={
            "Authorization": f"Bearer {auth_token}"
        }
    )

    # Execute the agent tool loop
    return await ai.stream_loop(llm, messages, tools=tools)


async def main():
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
agent_token = await auth_manager.get_agent_token(["openid", "email"])

        auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid", "email"])

        callback = OAuthCallbackServer(port=6274)
        callback.start()

        print(f"\nOpening browser for authentication...")
        webbrowser.open(auth_url)

        auth_code, returned_state, error = await callback.wait_for_code()
        callback.stop()

        if auth_code is None:
            print(f"Authorization failed or cancelled. Error: {error}")
            return

        obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)

        google_key = os.getenv("GOOGLE_API_KEY", "")
        os.environ["OPENAI_API_KEY"] = google_key
        os.environ["OPENAI_BASE_URL"] = "https://generativelanguage.googleapis.com/v1beta/openai/"

        llm = ai.openai.OpenAIModel(
            model=os.getenv("MODEL_NAME")
        )

        while True:
            user_input = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")

            # Exit the loop if the user types "exit"
            if user_input.lower() == "exit":
                print("Exiting the program. Goodbye!")
                break

            messages = ai.make_messages(user=user_input)
            result = ai.run(my_agent, llm, messages, obo_token.access_token)

            print("\nAgent Response: ", end="")

            # Stream the output token-by-token
            async for msg in result:
                if getattr(msg, "text_delta", None):
                    print(msg.text_delta, end="", flush=True)
            print()

if __name__ == "__main__":
asyncio.run(main())

Project Structure (OBO flow)

After adding OBO support, your project should look like this:

├── main.py              # AI agent with OBO authentication flow
├── oauth_callback.py    # Captures OAuth redirect from Asgardeo
└── .env                 # Environment configuration

Run and Test the OBO flow

Start your agent:

  python main.py

You will see an output similar to this and your default browser will open, prompting you to log in:

    Opening browser for authentication...

Info

You need to create a test user in Asgardeo by following the instructions in the Onboard a User guide to try out the login feature.

After successful login, return to the terminal. Your agent will automatically resume once it receives the authorization code and call the MCP tool on behalf of the authenticated user.

    Successfully obtained OBO Token.

    Enter your question (e.g., 'Add 45 and 99') or type 'exit' to quit:

Your AI agent has now successfully performed an authenticated, user-authorized, On-Behalf-Of request to your MCP server.

[//] STEPS_END