Agent Identity Quickstart¶
Welcome to the Agent Identity Quickstart!
This guide walks you through establishing AI agent identities with Asgardeo, authenticating agents with their credentials, and integrating them seamlessly with secure MCP servers using Python over modern agent frameworks and cutting-edge AI models.
For the demonstration, let's build a math-capable agent that will interpret conversational queries and invoke specific arithmetic tools exposed by a secured Model Context Protocol (MCP) server to deliver precise calculations.
By the end of this guide, you will have:
- An AI agent that authenticates using Agent Credentials, obtains a token to access a secure MCP Server(AI agent acting on its own)
- An AI agent that gets authorization delegated by a user to access a secured MCP Server (Agent acting On-Behalf-Of (OBO) a user)
- A clear understanding of both authentication scenarios described in Agent Authentication Guide
You do not need prior agent development experience. Everything you need is explained as you go.
[//] STEPS_START
Register an AI agent¶
To establish an identity for your AI agent, begin by registering it in Asgardeo.
- Sign in to Asgardeo console and go to Agents.
- Click + New Agent.
- Provide:
- Name: A descriptive name for your AI agent for human-readable display purposes
- Description (optional): Purpose and functionality of the agent
Example
Name: Math Assistant Agent
Description: An AI agent that invokes protected MCP tools to answer math-related questions.
- Click Create to complete the registration.
After successful registration, your agent will receive a unique Agent ID and an Agent Secret, which is shown only once. Make sure to store them securely, as you’ll need them later in this guide.
Configure an Application in Asgardeo¶
To allow your agent (or user acting through the agent) to authenticate and connect to a secure MCP server, a MCP Client needs to be set up in Asgardeo.
- In Asgardeo console, navigate to Applications > New Application.
- Select MCP Client Application and complete the wizard pop-up by providing a suitable name and an authorized redirect URL.
Example
Name: AgentAuthenticatorApp
Authorized redirect URL: http://localhost:6274/oauth/callback
Info
The authorized redirect URL defines the location Asgardeo sends users to after a successful login, typically the address of the client application that connects to the MCP server.
In this guide, the AI agent behaves as the client, which consists of a lightweight OAuth 2.1 callback server running at http://localhost:6274/oauth/callback to capture the authorization code. So, we will use this URL as the authorized redirect for this guide.
Make a note of the client-id from the Protocol tab of the registered application. You will need it during the Build an AI Agent section of this guide.
Run the MCP Server¶
Your AI agent will call an MCP tool hosted on a secure MCP server. You can:
- Follow the MCP Auth Server Quickstart to set one up quickly (Recommended), or
- Use your own MCP server secured with Asgardeo
Build an AI Agent¶
Create a directory called agent-auth-quickstart by running the following commands.
mkdir agent-auth-quickstart
cd agent-auth-quickstart
Then set up and activate a Python virtual environment using the following commands.
python3 -m venv .venv
source .venv/bin/activate
python -m venv .venv
.venv\Scripts\activate
Pick your agent development framework and install the corresponding dependencies.
pip install asgardeo asgardeo_ai langchain langchain-google-genai langchain-mcp-adapters python-dotenv
pip install asgardeo asgardeo_ai python-dotenv google-adk==1.20.0 google-genai==1.54.0
pip install asgardeo asgardeo_ai crewai google-genai==1.54.0 python-dotenv
pip install asgardeo asgardeo_ai python-dotenv vercel-ai-sdk==0.0.1.dev4
Create main.py that implements an AI agent which first obtains a valid access token from Asgardeo by authenticating itself. The agent then includes that token in the Authorization header (for example Authorization: Bearer <token>) when calling the MCP tool.
import os
import asyncio
from dotenv import load_dotenv
from pathlib import Path
from asgardeo import AsgardeoConfig, AsgardeoNativeAuthClient
from asgardeo_ai import AgentConfig, AgentAuthManager
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async def main():
# Scenario 1: AI agent acting on its own using its own credentials to authenticate
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
# Get agent token
agent_token = await auth_manager.get_agent_token(["openid"])
# Connect to MCP Server with Authorization Header
client = MultiServerMCPClient(
{
"mcp_server": {
"transport": "streamable_http",
"url": os.getenv("MCP_SERVER_URL"),
"headers": {
"Authorization": f"Bearer {agent_token.access_token}"
}
}
}
)
# LLM (Gemini) + LangChain Agent
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0.9
)
tools = await client.get_tools()
agent = create_agent(llm, tools)
user_input = input("Enter your question: ")
# Invoke the agent
response = await agent.ainvoke(
{"messages": [{"role": "user", "content": user_input}]}
)
print("Agent Response:", response["messages"][-1].content)
# Run app
if __name__ == "__main__":
asyncio.run(main())
import os
import asyncio
from pathlib import Path
from dotenv import load_dotenv
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import InMemoryRunner
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams
from google.genai import types
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async def build_toolset():
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
# Get agent token
agent_token = await auth_manager.get_agent_token(["openid"])
# Connect to MCP Server with Auth Header
return McpToolset(
connection_params=StreamableHTTPConnectionParams(
url= os.getenv("MCP_SERVER_URL"),
headers={"Authorization": f"Bearer {agent_token.access_token}"}
)
)
async def main():
mcp_toolset = await build_toolset()
# Define LLM Agent (Gemini)
agent = LlmAgent(
model="gemini-2.0-flash",
name="add_agent",
description="Adds two numbers using an MCP server.",
instruction="When the user asks to add numbers, call the MCP tool `add(a, b)`.",
tools=[mcp_toolset],
)
# Setup runner + session
runner = InMemoryRunner(agent, app_name="add_numbers_app")
session = await runner.session_service.create_session(
app_name="add_numbers_app",
user_id="user"
)
question = input("Enter your question: ")
try:
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=types.Content(
role="user",
parts=[types.Part(text=question)]
),
):
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
print(text)
finally:
await mcp_toolset.close()
await runner.close()
if __name__ == "__main__":
asyncio.run(main())
import os
import asyncio
from pathlib import Path
from dotenv import load_dotenv
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
from crewai import Agent, Task, Crew
from crewai.mcp import MCPServerHTTP
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async def get_agent_token():
# Asynchronously fetches the agent token from Asgardeo.
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
return await auth_manager.get_agent_token(["openid"])
def main():
agent_token = asyncio.run(get_agent_token())
while True:
question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
if question.lower() == "exit":
print("Exiting the program. Goodbye!")
break
mcp_server = MCPServerHTTP(
url=os.getenv("MCP_SERVER_URL"),
headers={"Authorization": f"Bearer {agent_token.access_token}"},
streamable=True
)
agent = Agent(
role="Calculation Specialist",
goal="Add two numbers accurately using an MCP server.",
backstory="You are an intelligent agent that strictly uses the provided MCP tool 'add(a, b)' to compute the addition of numbers when requested by a user.",
mcps=[mcp_server],
llm=os.getenv("MODEL_NAME"), # Use 'gemini/gemini-1.5-flash' or similar
verbose=False
)
task = Task(
description=f"Address the user's request: '{question}'",
expected_output="The exact calculated sum of the numbers based on the MCP tool execution.",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task]
)
result = crew.kickoff()
print("\nAgent Response:", result.raw)
if __name__ == "__main__":
main()
import os
import asyncio
from dotenv import load_dotenv
from pathlib import Path
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
import vercel_ai_sdk as ai
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async def my_agent(llm, messages, auth_token):
tools = await ai.mcp.get_http_tools(
os.getenv("MCP_SERVER_URL"),
headers={
"Authorization": f"Bearer {auth_token}"
}
)
return await ai.stream_loop(llm, messages, tools=tools)
async def main():
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
agent_token = await auth_manager.get_agent_token(["openid"])
google_key = os.getenv("GOOGLE_API_KEY", "")
os.environ["OPENAI_API_KEY"] = google_key
os.environ["OPENAI_BASE_URL"] = "https://generativelanguage.googleapis.com/v1beta/openai/"
llm = ai.openai.OpenAIModel(
model=os.getenv("MODEL_NAME")
)
while True:
user_input = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
messages = ai.make_messages(user=user_input)
# Exit the loop if the user types "exit"
if user_input.lower() == "exit":
print("Exiting the program. Goodbye!")
break
result = ai.run(my_agent, llm, messages, agent_token.access_token)
print("\nAgent Response: ", end="")
async for msg in result:
if getattr(msg, "text_delta", None):
print(msg.text_delta, end="", flush=True)
print()
if __name__ == "__main__":
asyncio.run(main())
Add environment configuration by creating a .env file at the project root to hold the Asgardeo configuration:
# Asgardeo OAuth2 Configuration
ASGARDEO_BASE_URL=https://api.asgardeo.io/t/<your-tenant>
CLIENT_ID=<your-client-id>
REDIRECT_URI=http://localhost:6274/oauth/callback
# Asgardeo Agent Credentials
AGENT_ID=<agent_id>
AGENT_SECRET=<agent_secret>
# Google Gemini API Key
GOOGLE_API_KEY=<google_api_key>
# MCP Server URL
MCP_SERVER_URL=<mcp_server_url>
# LLM model used by the agent (any supported model can be used).
MODEL_NAME="gemini-2.5-flash"
Important
-
Replace
<your-tenant>,<your-client-id>and the redirect URL with the values obtained from the Asgardeo console. The tenant name is visible in the console URL path (e.g.,https://console.asgardeo.io/t/<your-tenant>), and theclient IDcan be found in the application's Protocol tab. -
Add the
Agent IDandAgent Secretfrom the Agent Registration step. -
You’ll need a Google API key to use Gemini as your model. You can generate one from Google AI Studio
-
Replace
<mcp_server_url>with your MCP server’s URL. If you followed the MCP Auth Server quickstart, you can use:http://127.0.0.1:8000/mcp
Project Structure¶
Your project folder should now look like this:
├── main.py # Your AI Agent
└── .env # Your Asgardeo configs
Run and Test with Authentication¶
Start your AI Agent by running the following command.
python main.py
If authentication succeeds, your agent will prompt you for a question and securely invoke the MCP tool.
Enter your question: Can you add twenty two and twelve?
Agent Response: The sum of twenty two and twelve is 34.
If authentication fails, the MCP server will return:
httpx.HTTPStatusError: Client error '401 Unauthorized'
To test the setup without authentication, simply remove the Authorization header from your client configuration, as shown below:
...
client = MultiServerMCPClient(
{
"mcp_server": {
"transport": "streamable_http",
"url": os.getenv("MCP_SERVER_URL")
}
}
)
...
...
return McpToolset(
connection_params=StreamableHTTPConnectionParams(
url= os.getenv("MCP_SERVER_URL")
)
)
...
...
mcp_server = MCPServerHTTP(
url=os.getenv("MCP_SERVER_URL"),
streamable=True
)
...
...
tools = await ai.mcp.get_http_tools(
os.getenv("MCP_SERVER_URL"),
)
...
Test the On-Behalf-Of (OBO) Flow¶
In the previous step, the AI agent authenticated itself using its own credentials. Now, let’s look at the scenario where the agent authenticates on behalf of a user.
This flow uses:
- Authorization code issued after the user logs in
- PKCE (Proof Key for Code Exchange) to ensure only your agent can securely exchange the authorization code for the OBO token
- A final token exchange that produces an OBO token, representing the user
Your AI agent will then call the MCP server as the authenticated user.
During the OBO flow, Asgardeo redirects back to your client application with an authorization code after the user logs in.
To handle this, create a file named oauth_callback.py with the following implementation at the project root. This lightweight HTTP server listens for the redirect and captures authorization_code and state.
Expand to view the implementation of `oauth_callback.py`
import http.server
import socketserver
import threading
import asyncio
from urllib.parse import urlparse, parse_qs
class OAuthCallbackServer:
def __init__(self, port: int = 6274, timeout: int = 120):
self.port = port
self.timeout = timeout
self.auth_code = None
self.state = None
self._error = None
self._httpd = None
class _Handler(http.server.SimpleHTTPRequestHandler):
parent = None
def do_GET(self):
url = urlparse(self.path)
params = parse_qs(url.query)
# OAuth error case
if "error" in params:
self.parent._error = params["error"][0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Authorization cancelled or failed. You can close this window.")
return
# Success case
if "code" in params:
self.parent.auth_code = params["code"][0]
self.parent.state = params.get("state", [None])[0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Authentication successful. You can close this window.")
return
# Invalid callback
if url.path != "/oauth/callback":
self.parent._error = "Invalid Callback URL"
self.send_response(200)
self.end_headers()
self.wfile.write(b"Invalid redirect. You can close this window.")
return
def start(self):
handler = self._Handler
handler.parent = self
self._httpd = socketserver.TCPServer(("localhost", self.port), handler)
thread = threading.Thread(target=self._httpd.serve_forever)
thread.daemon = True
thread.start()
def stop(self):
if self._httpd:
self._httpd.shutdown()
async def wait_for_code(self):
"""Returns (auth_code, state). auth_code==None means canceled, error, or timed out."""
elapsed = 0
while self.auth_code is None and self._error is None and elapsed < self.timeout:
await asyncio.sleep(0.1)
elapsed += 0.1
return (self.auth_code, self.state, self._error)
Then, update the main.py to perform the OBO Flow. This will:
- Authenticate the agent
- Generate an authorization URL for the user
- Capture the authorization code
- Exchange the code + agent token for an OBO token
- Call the MCP server using the OBO token
Here is the updated implementation:
import os
import webbrowser
import asyncio
from dotenv import load_dotenv
from pathlib import Path
from asgardeo import AsgardeoConfig, AsgardeoNativeAuthClient
from asgardeo_ai import AgentConfig, AgentAuthManager
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from oauth_callback import OAuthCallbackServer
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async def main():
# Perform OBO flow (authenticating on behalf of the user)
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
# Get agent token
agent_token = await auth_manager.get_agent_token(["openid"])
# Generate user authorization URL
auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid"])
callback = OAuthCallbackServer(port=6274)
callback.start()
print(f"\nOpening browser for authentication...")
webbrowser.open(auth_url)
# Wait for redirect
auth_code, returned_state, error = await callback.wait_for_code()
callback.stop()
if auth_code is None:
print(f"Authorization failed or cancelled. Error: {error}")
return
print(f"Received auth_code={auth_code}")
# Exchange auth code for user token (OBO flow)
obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)
# Connect to MCP Server with Authorization Header
client = MultiServerMCPClient(
{
"mcp_server": {
"transport": "streamable_http",
"url": "<mcp_server_url>",
"headers": {
"Authorization": f"Bearer {obo_token.access_token}",
}
}
}
)
# LLM (Gemini) + LangChain Agent
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0.9
)
tools = await client.get_tools()
agent = create_agent(llm, tools)
user_input = input("Enter your question: ")
# Invoke the agent
response = await agent.ainvoke(
{"messages": [{"role": "user", "content": user_input}]}
)
print("Agent Response:", response["messages"][-1].content)
# Run app
if __name__ == "__main__":
asyncio.run(main())
import os
import webbrowser
import asyncio
from pathlib import Path
from dotenv import load_dotenv
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import InMemoryRunner
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams
from google.genai import types
from oauth_callback import OAuthCallbackServer
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
# Perform OBO flow (authenticating on behalf of the user)
async def build_toolset():
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
# Get agent token
agent_token = await auth_manager.get_agent_token(["openid"])
# Generate user authorization URL
auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid"])
callback = OAuthCallbackServer(port=6274)
callback.start()
print(f"\nOpening browser for authentication...")
webbrowser.open(auth_url)
# Wait for redirect
auth_code, returned_state, error = await callback.wait_for_code()
callback.stop()
if auth_code is None:
print(f"Authorization failed or cancelled. Error: {error}")
return None
print(f"Received auth_code={auth_code}")
# Exchange auth code for user token (OBO flow)
obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)
# Connect to MCP Server with Auth Header
return McpToolset(
connection_params=StreamableHTTPConnectionParams(
url= "<mcp_server_url>",
headers={"Authorization": f"Bearer {obo_token.access_token}"}
)
)
async def main():
mcp_toolset = await build_toolset()
if mcp_toolset is None:
return
# Define LLM Agent (Gemini)
agent = LlmAgent(
model="gemini-2.0-flash",
name="add_agent",
description="Adds two numbers using an MCP server.",
instruction="When the user asks to add numbers, call the MCP tool `add(a, b)`.",
tools=[mcp_toolset],
)
# Setup runner + session
runner = InMemoryRunner(agent, app_name="add_numbers_app")
session = await runner.session_service.create_session(
app_name="add_numbers_app",
user_id="user"
)
question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
try:
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=types.Content(
role="user",
parts=[types.Part(text=question)]
),
):
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
print(text)
finally:
await mcp_toolset.close()
await runner.close()
if __name__ == "__main__":
asyncio.run(main())
import os
import asyncio
import sys
import webbrowser
from pathlib import Path
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from crewai.mcp import MCPServerHTTP
from pydantic import Field
# Asgardeo / Identity imports
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
from oauth_callback import OAuthCallbackServer
import warnings
# Suppress RuntimeWarning related to coroutine not awaited
warnings.filterwarnings("ignore", category=RuntimeWarning, message="coroutine '.*' was never awaited")
# Load environment variables from .env file
load_dotenv()
async def get_obo_token():
# Handles the OAuth/OBO flow to get the user token.
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
agent_token = await auth_manager.get_agent_token(["openid", "email"])
auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid", "email"])
callback = OAuthCallbackServer(port=6274)
callback.start()
print(f"\nOpening browser for authentication...")
webbrowser.open(auth_url)
auth_code, _, error = await callback.wait_for_code()
callback.stop()
if not auth_code:
raise Exception(f"Auth failed: {error}")
obo_token = await auth_manager.get_obo_token(
auth_code,
agent_token=agent_token,
code_verifier=code_verifier
)
return obo_token.access_token
# --- Main CrewAI Execution ---
async def main():
# 1. Get the Token via OBO Flow
try:
access_token = await get_obo_token()
print("Successfully obtained OBO Token.")
except Exception as e:
print(f"Failed to authenticate: {e}")
return
while True:
# 2. Get the user question
question = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
# Exit the loop if the user types "exit"
if question.lower() == "exit":
print("Exiting the program. Goodbye!")
break
# 3. Configure the MCP server for CrewAI
# We map StreamableHTTPConnectionParams directly to MCPServerHTTP
mcp_server = MCPServerHTTP(
url=os.getenv("MCP_SERVER_URL"),
headers={"Authorization": f"Bearer {access_token}"},
streamable=True
)
# 4. Define the CrewAI Agent
agent = Agent(
role="Calculation Specialist",
goal="Add two numbers accurately using an MCP server.",
backstory="You are an intelligent agent that strictly uses the provided MCP tool 'add(a, b)' to compute the addition of numbers when requested by a user.",
mcps=[mcp_server],
llm=os.getenv("MODEL_NAME"), # Use 'gemini/gemini-1.5-flash' or similar
verbose=False
)
# 5. Define the Task
task = Task(
description=f"Address the user's request: '{question}'",
expected_output="The exact calculated sum of the numbers based on the MCP tool execution.",
agent=agent
)
# 6. Setup and run the Crew
crew = Crew(
agents=[agent],
tasks=[task]
)
result = crew.kickoff()
print("\nAgent Response:", result.raw)
if __name__ == "__main__":
asyncio.run(main())
import os
import asyncio
import sys
import webbrowser
import vercel_ai_sdk as ai
from dotenv import load_dotenv
from pathlib import Path
from asgardeo import AsgardeoConfig
from asgardeo_ai import AgentConfig, AgentAuthManager
from oauth_callback import OAuthCallbackServer
# Load environment variables from .env file
load_dotenv()
ASGARDEO_CONFIG = AsgardeoConfig(
base_url=os.getenv("ASGARDEO_BASE_URL"),
client_id=os.getenv("CLIENT_ID"),
redirect_uri=os.getenv("REDIRECT_URI")
)
AGENT_CONFIG = AgentConfig(
agent_id=os.getenv("AGENT_ID"),
agent_secret=os.getenv("AGENT_SECRET")
)
# 1. Define the agent logic (no decorators needed)
async def my_agent(llm, messages, auth_token):
# Connect to MCP Server using the user's OBO token
tools = await ai.mcp.get_http_tools(
os.getenv("MCP_SERVER_URL"),
headers={
"Authorization": f"Bearer {auth_token}"
}
)
# Execute the agent tool loop
return await ai.stream_loop(llm, messages, tools=tools)
async def main():
async with AgentAuthManager(ASGARDEO_CONFIG, AGENT_CONFIG) as auth_manager:
agent_token = await auth_manager.get_agent_token(["openid", "email"])
auth_url, state, code_verifier = auth_manager.get_authorization_url_with_pkce(["openid", "email"])
callback = OAuthCallbackServer(port=6274)
callback.start()
print(f"\nOpening browser for authentication...")
webbrowser.open(auth_url)
auth_code, returned_state, error = await callback.wait_for_code()
callback.stop()
if auth_code is None:
print(f"Authorization failed or cancelled. Error: {error}")
return
obo_token = await auth_manager.get_obo_token(auth_code, agent_token=agent_token, code_verifier=code_verifier)
google_key = os.getenv("GOOGLE_API_KEY", "")
os.environ["OPENAI_API_KEY"] = google_key
os.environ["OPENAI_BASE_URL"] = "https://generativelanguage.googleapis.com/v1beta/openai/"
llm = ai.openai.OpenAIModel(
model=os.getenv("MODEL_NAME")
)
while True:
user_input = input("\nEnter your question (e.g., 'Add 45 and 99') or type 'exit' to quit: ")
# Exit the loop if the user types "exit"
if user_input.lower() == "exit":
print("Exiting the program. Goodbye!")
break
messages = ai.make_messages(user=user_input)
result = ai.run(my_agent, llm, messages, obo_token.access_token)
print("\nAgent Response: ", end="")
# Stream the output token-by-token
async for msg in result:
if getattr(msg, "text_delta", None):
print(msg.text_delta, end="", flush=True)
print()
if __name__ == "__main__":
asyncio.run(main())
Project Structure (OBO flow)¶
After adding OBO support, your project should look like this:
├── main.py # AI agent with OBO authentication flow
├── oauth_callback.py # Captures OAuth redirect from Asgardeo
└── .env # Environment configuration
Run and Test the OBO flow¶
Start your agent:
python main.py
You will see an output similar to this and your default browser will open, prompting you to log in:
Opening browser for authentication...
Info
You need to create a test user in Asgardeo by following the instructions in the Onboard a User guide to try out the login feature.
After successful login, return to the terminal. Your agent will automatically resume once it receives the authorization code and call the MCP tool on behalf of the authenticated user.
Successfully obtained OBO Token.
Enter your question (e.g., 'Add 45 and 99') or type 'exit' to quit:
Your AI agent has now successfully performed an authenticated, user-authorized, On-Behalf-Of request to your MCP server.
[//] STEPS_END