Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
a76ff24
changes
20001020ycx Oct 7, 2025
697f366
changes for session states
20001020ycx Oct 7, 2025
70dff32
linting
20001020ycx Oct 8, 2025
0d8b06a
finalize get_instructions
20001020ycx Oct 8, 2025
c5ac54e
manual pass
20001020ycx Oct 9, 2025
d326f76
fine functionality wise
20001020ycx Oct 9, 2025
37f2c41
queryresult ready for pr
20001020ycx Oct 9, 2025
47d0301
sessionState ready for pr
20001020ycx Oct 9, 2025
2201266
another pass
20001020ycx Oct 9, 2025
aa37f1a
delete ai doc
20001020ycx Oct 9, 2025
a3076b6
pass for minor files
20001020ycx Oct 9, 2025
d3d7830
basically okay with functionality
20001020ycx Oct 9, 2025
ec0f2bc
fix uv formating and testing
20001020ycx Oct 9, 2025
21e0386
complete test review
20001020ycx Oct 9, 2025
edf68e4
adding multi-threading test
20001020ycx Oct 9, 2025
3d2be57
passing all tests
20001020ycx Oct 10, 2025
e1c1b3b
test docstring
20001020ycx Oct 10, 2025
7844653
Merge branch 'main' of github.com:y-scope/clp into 10-4-mcp-server-im…
20001020ycx Oct 10, 2025
97e7f23
fix docstring
20001020ycx Oct 10, 2025
d3ccf3e
pass on docstring
20001020ycx Oct 10, 2025
15315ec
addressing code rabbit
20001020ycx Oct 10, 2025
8ce8a38
addressing 2nd round of rabbit review
20001020ycx Oct 10, 2025
6faeba9
docstring fixes
20001020ycx Oct 10, 2025
466e242
add paging metadata to documentation
20001020ycx Oct 10, 2025
cccd8ec
nits
20001020ycx Oct 10, 2025
b7c3b87
update concurrency model docstring
20001020ycx Oct 10, 2025
9c6b881
supress linter
20001020ycx Oct 11, 2025
184c94e
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 12, 2025
07aa3f8
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 12, 2025
1c2d810
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 12, 2025
02c706f
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 12, 2025
da0b2e3
var renaming
20001020ycx Oct 12, 2025
142573a
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 12, 2025
6f426ca
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 12, 2025
c8816a5
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 12, 2025
702837c
address easy to fix comments
20001020ycx Oct 12, 2025
e267ec4
resolving conflicts
20001020ycx Oct 12, 2025
ed7765b
fix strings
20001020ycx Oct 12, 2025
9e73062
missed leading space
20001020ycx Oct 12, 2025
01c6a05
wrong docstring
20001020ycx Oct 12, 2025
67d183e
fix errors by latest change
20001020ycx Oct 12, 2025
ce744d7
trivial comment addressing
20001020ycx Oct 14, 2025
4fc5cc3
adding fixure test for unit test
20001020ycx Oct 14, 2025
23797fd
fix unit test var
20001020ycx Oct 14, 2025
0644fb7
fix rabbits change
20001020ycx Oct 14, 2025
662b47d
fix rabbit
20001020ycx Oct 14, 2025
1a53ad3
adding lock to enforce thread-safety
20001020ycx Oct 14, 2025
862aa42
done with impl
20001020ycx Oct 14, 2025
b79f681
sync up with chaoyue about async single thread concurrency
20001020ycx Oct 15, 2025
134591c
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
50e5db3
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
1cad6b0
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
2683bcf
Update components/clp-mcp-server/clp_mcp_server/server/server.py
20001020ycx Oct 16, 2025
22de748
code review format update
20001020ycx Oct 16, 2025
c934ac7
implement async garbage collector
20001020ycx Oct 16, 2025
4a2bd41
Update components/clp-mcp-server/clp_mcp_server/server/server.py
20001020ycx Oct 16, 2025
ad11e90
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
a1979d6
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
2193824
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
d6a7944
fix docstring
20001020ycx Oct 16, 2025
00f9503
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 16, 2025
9678909
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
5af4233
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
392af88
Update components/clp-mcp-server/clp_mcp_server/server/session_manage…
20001020ycx Oct 16, 2025
07f17a5
done with docstring comment
20001020ycx Oct 16, 2025
d4f6015
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 16, 2025
540e78d
done with async garbage collector
20001020ycx Oct 16, 2025
c480b33
move checking to session
20001020ycx Oct 16, 2025
c411ae0
fix docstring
20001020ycx Oct 16, 2025
2601008
Minor fix.
LinZhihao-723 Oct 16, 2025
e38f988
Merge branch '10-4-mcp-server-implementation' of https://github.com/2…
LinZhihao-723 Oct 16, 2025
08c80b3
Add instruction on how to run unit tests.
LinZhihao-723 Oct 16, 2025
1de6a46
fix unit tests
20001020ycx Oct 16, 2025
86184c9
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 16, 2025
a2a13b8
Minor fix.
LinZhihao-723 Oct 16, 2025
8a5c204
Minor renaming.
LinZhihao-723 Oct 16, 2025
9caab35
hoist
20001020ycx Oct 16, 2025
f7a6579
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 16, 2025
a54b019
update docstring of session manager class
20001020ycx Oct 16, 2025
046b6f6
fix ttl seconds
20001020ycx Oct 16, 2025
ebad693
Update unit tests.
LinZhihao-723 Oct 16, 2025
fa8dcc5
Merge branch '10-4-mcp-server-implementation' of https://github.com/2…
LinZhihao-723 Oct 16, 2025
5a12084
System Prompt
20001020ycx Oct 16, 2025
acf99a9
Merge branch '10-4-mcp-server-implementation' of github.com:20001020y…
20001020ycx Oct 16, 2025
3b7daca
Refactor page schema.
LinZhihao-723 Oct 16, 2025
7e7b321
Merge branch '10-4-mcp-server-implementation' of https://github.com/2…
LinZhihao-723 Oct 16, 2025
2b3f13a
Fix formating issue.
LinZhihao-723 Oct 16, 2025
c206f7b
Fix type annotation.
LinZhihao-723 Oct 16, 2025
f882168
Merge branch 'main' into 10-4-mcp-server-implementation
20001020ycx Oct 16, 2025
8e67e3d
Merge branch 'main' into 10-4-mcp-server-implementation
LinZhihao-723 Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions components/clp-mcp-server/clp_mcp_server/server/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Constants for CLP MCP server."""


class CLPMcpConstants:
"""Constants for the CLP MCP Server."""

CLEAN_UP_SECONDS = 600 # 10 minutes
ITEM_PER_PAGE = 10
MAX_CACHED_RESULTS = 1000
SESSION_TTL_MINUTES = 60

SERVER_NAME = "clp-mcp-server"
SYSTEM_PROMPT = (
"You are an AI assistant that helps users query a log database using KQL "
"(Kibana Query Language). When given a user query, you should generate a KQL "
"query that accurately captures the user's intent. The KQL query should be as "
"specific as possible to minimize the number of log messages returned. "
"You should also consider the following guidelines when generating KQL queries: "
"- Use specific field names and values to narrow down the search. "
"- Avoid using wildcards (*) unless absolutely necessary, as they can lead to "
"large result sets. - Use logical operators (AND, OR, NOT) to combine multiple "
"conditions. - Consider the time range of the logs you are searching. If the "
"user specifies a time range, include it in the KQL query. - If the user query "
"is ambiguous or lacks detail, ask clarifying questions to better understand "
"their intent before generating the KQL query. - Always ensure that the "
"generated KQL query is syntactically correct and can be executed without errors. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably help polish this prompt later, but a few style-wise suggestions:

  • Can u ensure the line wrap happens when 100-char overflow is triggered? The current wrapping is kind of random.
  • We use uaully leave the space to the new line. So, instead of "hello "\n"world", we should do "hello"\n" world".
  • Do u think adding \n to the prompt would make it more readable? Or from AI's view it will be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • sure, I will update to make it more readable
  • I think I get your point, submitted a version with all format fixed.

But I would say don't spend too much time on polishing the prompt, we will have a separate PR just to prune the documentation of the AI project once we have all the features in, see future work in the description.

Copy link
Member

@junhaoliao junhaoliao Oct 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use triple-quoted string instead to make this more readable to human? e.g.,

SYSTEM_PROMPT = """You are an AI assistant that helps users query a log database using KQL (Kibana Query Language). 
When given a user query, you should generate a KQL query that accurately captures the user's intent. 
The KQL query should be as specific as possible to minimize the number of log messages returned.

You should also consider the following guidelines when generating KQL queries:
- Use specific field names and values to narrow down the search.
- Avoid using wildcards (*) unless absolutely necessary, as they can lead to large result sets.
- Use logical operators (AND, OR, NOT) to combine multiple conditions.
- Consider the time range of the logs you are searching. If the user specifies a time range, include it in the KQL query.
- If the user query is ambiguous or lacks detail, ask clarifying questions to better understand their intent before generating the KQL query.
- Always ensure that the generated KQL query is syntactically correct and can be executed without errors.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, right, I will make the update

)
59 changes: 29 additions & 30 deletions components/clp-mcp-server/clp_mcp_server/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,10 @@

from typing import Any

from fastmcp import FastMCP
from fastmcp import Context, FastMCP


class ProtocolConstant:
"""Constants for the CLP MCP Server."""

SERVER_NAME = "clp-mcp-server"

# Tool names
TOOL_HELLO_WORLD = "hello_world"
TOOL_GET_SERVER_INFO = "get_server_info"

@classmethod
def get_capabilities(cls) -> list[str]:
"""
Gets the capabilities of the server.
:return: A list of tool names supported by the server.
"""
return [cls.TOOL_HELLO_WORLD, cls.TOOL_GET_SERVER_INFO]
from .constants import CLPMcpConstants
from .session_manager import SessionManager


def create_mcp_server() -> FastMCP:
Expand All @@ -31,22 +16,36 @@ def create_mcp_server() -> FastMCP:
:raise: Propagates `FastMCP.__init__`'s exceptions.
:raise: Propagates `FastMCP.tool`'s exceptions.
"""
mcp = FastMCP(name=ProtocolConstant.SERVER_NAME)
mcp = FastMCP(name=CLPMcpConstants.SERVER_NAME)

session_manager = SessionManager(CLPMcpConstants.SESSION_TTL_MINUTES)

@mcp.tool()
def get_server_info() -> dict[str, Any]:
@mcp.tool
def get_instructions(ctx: Context) -> str:
"""
Gets the MCP server's information.
Gets a pre-defined “system prompt” that guides the LLM behavior.
This function must be invoked before any other `FastMCP.tool`.

:return: The server's information with a list of capabilities.
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session.
:return: A string of “system prompt”.
"""
return {
"name": ProtocolConstant.SERVER_NAME,
"capabilities": ProtocolConstant.get_capabilities(),
"status": "running",
}
session = session_manager.get_or_create_session(ctx.session_id)
session.ran_instructions = True
return CLPMcpConstants.SYSTEM_PROMPT

@mcp.tool
def get_nth_page(page_index: int, ctx: Context) -> dict[str, Any]:
"""
Retrieves the n-th page of a paginated response from the previous query.

:param page_index: Zero-based index, e.g., 0 for the first page
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session.
:return: On success, dictionary containing paged log entries and pagination metadata.
On error, dictionary with ``{"Error": "error message describing the failure"}``.
"""
return session_manager.get_nth_page(ctx.session_id, page_index)

@mcp.tool()
@mcp.tool
def hello_world(name: str = "clp-mcp-server user") -> dict[str, Any]:
"""
Provides a simple hello world greeting.
Expand All @@ -56,7 +55,7 @@ def hello_world(name: str = "clp-mcp-server user") -> dict[str, Any]:
"""
return {
"message": f"Hello World, {name.strip()}!",
"server": ProtocolConstant.SERVER_NAME,
"server": CLPMcpConstants.SERVER_NAME,
"status": "running",
}

Expand Down
206 changes: 206 additions & 0 deletions components/clp-mcp-server/clp_mcp_server/server/session_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""Session management for CLP MCP Server."""

import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any

from paginate import Page

from .constants import CLPMcpConstants


@dataclass(frozen=True)
class QueryResult:
"""Cached results from previous query's response."""

total_results: list[str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do u mean by "total" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my oversight, this is the full response we get from the previous query, in the class QueryResult, we will paginate the result and feed back to AI rather than simply returning the full result.

As said, this should be updated with cached_response

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you just look at this single object, it doesn't have to know it's a cache entry. I think:

  • How about renaming the class to QueryResultPaginator, which matches what u state.
  • You can drop "total" then, and probably name it to result_log_entries. This is more explicit about what's stored inside this list (which tbh isn't clear from what's presented in the existing code).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adapted the second point. For the first point, how about PaginatedQueryResult, QueryResultPaginator sounds like a utility tool for pagination (the actual library I am exporting - Paginate) rather the class where we actually store the paginated result.

items_per_page: int

_total_pages: int = field(init=False, repr=False)

def __post_init__(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully get it. Why do we need __post_init__? You can do it with __init__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a question to you. Functionality-wise, there's nothing limits us from using the init. So I wonder what is the most pythonic way of doing this?
I declare @DataClass(frozen=True) so init is handled by python. Still, I need some code for init, that's why I throw it in the post_init. I learned/copied this from integration test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I don't think using dataclass is absolutely required.
  • The current implementation of using __setattr__ is like a hack. __post_init__ is supposed to be only for validation but not for creating a new member variable at runtime.

I'd say u should use __init__ to make a normal class in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, done

"""Validates that cached results don't exceed MAX_CACHED_RESULTS."""
if len(self.total_results) > CLPMcpConstants.MAX_CACHED_RESULTS:
err_msg = (
f"QueryResult exceeds maximum allowed cached results: "
f"{len(self.total_results)} > {CLPMcpConstants.MAX_CACHED_RESULTS}. "
)
raise ValueError(err_msg)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Exceeding MAX_CACHED_RESULTS should not crash tool flow.

Raising ValueError here bubbles out of SessionManager.cache_query_result causing tool failure. PR states the server caches up to 1,000 entries; prefer truncation or graceful error from SessionManager.

Minimal change aligning with objectives: pre-cap in SessionManager to avoid raising here.

@@ def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]:
-        session = self.get_or_create_session(session_id)
+        session = self.get_or_create_session(session_id)
         if session.ran_instructions is False:
             return {
                 "Error": "Please call get_instructions() first "
                 "to understand how to use this MCP server."
             }
-
-        session.cache_query_result(results=query_results)
+        # Cap results to MAX_CACHED_RESULTS to avoid overflow/ValueError
+        max_n = CLPMcpConstants.MAX_CACHED_RESULTS
+        capped = query_results[:max_n] if len(query_results) > max_n else query_results
+        try:
+            session.cache_query_result(results=capped)
+        except ValueError as e:
+            # Defensive: still return structured error if upstream changes or other checks fail
+            return {"Error": str(e)}

Alternatively, truncate in QueryResult.post_init via object.setattr instead of raising. Based on PR objectives.

Committable suggestion skipped: line range outside the PR's diff.

object.__setattr__(
self,
"_total_pages",
(len(self.total_results) + self.items_per_page - 1) // self.items_per_page,
)

def get_page(self, page_number: int) -> Page | None:
"""
Gets a specific page from the cached response.

:param page_number: One-based indexing, e.g., 1 for the first page
:return: Page object or None if page number is out of bounds
"""
if page_number > self._total_pages or page_number <= 0:
return None

return Page(
self.total_results,
page=page_number,
items_per_page=self.items_per_page,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Allow empty result sets to return page 1.

When total_results is empty, _total_pages becomes 0, so requesting the first page returns None and every caller receives “Page index is out of bounds.” even though the query succeeded. Please special-case the 0-results scenario so that page 1 returns an empty Page (with the expected pagination metadata) instead of an error.

🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/server/session_manager.py around
lines 55 to 62, the current bounds check returns None when _total_pages == 0 so
requesting page 1 fails; adjust the logic to special-case empty results: if
self._total_pages == 0 then allow page_number == 1 and return an empty Page with
total_results (0), page=1 and items_per_page=self.items_per_page; otherwise keep
the existing bounds check (return None when page_number <= 0 or page_number >
self._total_pages).



@dataclass
class SessionState:
"""State of a single user session."""

session_id: str
items_per_page: int
session_ttl_minutes: int
last_accessed: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
cached_query_result: QueryResult | None = None
ran_instructions: bool = False

def cache_query_result(
self,
results: list[str],
) -> None:
"""
Caches the latest query result of the session.

:param query_results: Complete log entries to cache
"""
self.cached_query_result = QueryResult(
total_results=results, items_per_page=self.items_per_page
)

def get_page_data(self, page_number: int) -> dict[str, Any]:
"""
Gets page data in a dictionary format.

:param page_number: One-based indexing, e.g., 1 for the first page
:return: On success, dictionary containing paged log entries and pagination metadata.
On error, dictionary with ``{"Error": "error message describing the failure"}``.
"""
if self.cached_query_result is None:
return {"Error": "No previous paginated response in this session."}

page = self.cached_query_result.get_page(page_number)
if page is None:
return {"Error": "Page index is out of bounds."}

return {
"items": list(page),
"total_pages": page.page_count,
"total_items": page.item_count,
"items_per_page": page.items_per_page,
"has_next": page.next_page is not None,
"has_previous": page.previous_page is not None,
}

def is_expired(self) -> bool:
""":return: whether the session has expired."""
time_diff = datetime.now(timezone.utc) - self.last_accessed
return time_diff > timedelta(minutes=self.session_ttl_minutes)

def update_access_time(self) -> None:
"""Updates the last accessed timestamp."""
self.last_accessed = datetime.now(timezone.utc)


class SessionManager:
"""Session manager for all user sessions."""

def __init__(self, session_ttl_minutes: int) -> None:
"""
Initializes the SessionManager and starts background cleanup thread.

:param session_ttl_minutes: Session time-to-live in minutes.
"""
self.session_ttl_minutes = session_ttl_minutes
# sessions is a shared variable as there may be multiple session attached to the MCP server
# session state is NOT a shared variable because each session is accessed by only one
# connection at a time, and API calls for a single session are synchronous.
self._sessions_lock = threading.Lock()
self.sessions: dict[str, SessionState] = {}
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._cleanup_thread.start()

def _cleanup_loop(self) -> None:
"""Cleans up all expired sessions periodically in a separate cleanup thread."""
while True:
time.sleep(CLPMcpConstants.CLEAN_UP_SECONDS)
self.cleanup_expired_sessions()

def cleanup_expired_sessions(self) -> None:
"""Cleans up all expired sessions."""
with self._sessions_lock:
expired_sessions = [
sid for sid, session in self.sessions.items() if session.is_expired()
]

for sid in expired_sessions:
del self.sessions[sid]

def get_or_create_session(self, session_id: str) -> SessionState:
"""
Gets an existing session or creates a new one.

:param session_id: Unique identifier for the session
:return: The SessionState object for the given session_id
"""
with self._sessions_lock:
if session_id in self.sessions and self.sessions[session_id].is_expired():
del self.sessions[session_id]

if session_id not in self.sessions:
self.sessions[session_id] = SessionState(
session_id, CLPMcpConstants.ITEM_PER_PAGE, self.session_ttl_minutes
)

session = self.sessions[session_id]

session.update_access_time()
return session

def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]:
"""
Caches query results for a session and return the first page.

:param session_id: Unique identifier for the session
:param query_results: Complete log entries to cache
:return: On success, dictionary containing the first page of log entries and
pagination metadata. On error, dictionary with
``{"Error": "error message describing the failure"}``.
"""
session = self.get_or_create_session(session_id)
if session.ran_instructions is False:
return {
"Error": "Please call get_instructions() first"
"to understand how to use this MCP server."
}

session.cache_query_result(results=query_results)

return session.get_page_data(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Add error handling for results exceeding MAX_CACHED_RESULTS.

Line 201 calls session.cache_query_result() which internally creates a QueryResult that can raise ValueError if results exceed MAX_CACHED_RESULTS. The PR objectives state the server "caches up to 1,000 log entries," implying graceful handling rather than tool failure.

Past reviews suggested either capping results before caching or adding try/except here to return a structured error. Without this, the tool crashes on large result sets.

Consider applying this diff to cap results before caching:

     def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]:
         """
         Caches query results for a session and returns the first page and the paging metadata.

         :param session_id: Unique identifier for the session.
         :param query_results: Complete log entries from previous query for caching.
         :return: On success, dictionary containing the first page of log entries and
         pagination metadata. On error, dictionary with
         ``{"Error": "error message describing the failure"}``.
         """
         session = self.get_or_create_session(session_id)
         if session.ran_instructions is False:
             return {
                 "Error": "Please call get_instructions() first "
                 "to understand how to use this MCP server."
             }

-        session.cache_query_result(results=query_results)
+        # Cap results to MAX_CACHED_RESULTS to prevent ValueError
+        max_results = CLPMcpConstants.MAX_CACHED_RESULTS
+        capped_results = query_results[:max_results] if len(query_results) > max_results else query_results
+        try:
+            session.cache_query_result(results=capped_results)
+        except ValueError as e:
+            # Defensive: return structured error if validation fails
+            return {"Error": str(e)}

         return session.get_page_data(1)

Alternatively, verify if callers are expected to validate result size upstream.


🏁 Script executed:

#!/usr/bin/env bash
rg -n "def cache_query_result" -C3 --type py
rg -n "MAX_CACHED_RESULTS" -C3 --type py
rg -n "class QueryResult" -C5 --type py

Length of output: 5560


🏁 Script executed:

#!/usr/bin/env bash
rg -n "cache_query_result" -C2 --type py components/clp-mcp-server/clp_mcp_server/server

Length of output: 1515


Add error handling for oversized query results in SessionManager.cache_query_result

Wrap the call to session.cache_query_result(results=query_results) (session_manager.py:201) in a try/except ValueError to return a structured error, or pre-cap query_results to CLPMcpConstants.MAX_CACHED_RESULTS before invoking.

🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/server/session_manager.py around
lines 201-203, the call to session.cache_query_result(results=query_results)
must handle oversized result errors: either wrap that call in a try/except
ValueError that logs the exception and returns a standardized error
response/object (e.g., {"error": "too many results", "code":
"MAX_CACHED_EXCEEDED"}) instead of letting the exception propagate, or pre-cap
query_results to CLPMcpConstants.MAX_CACHED_RESULTS (e.g., slice to the max
length) before calling session.cache_query_result so it cannot raise; after
either fix continue to return session.get_page_data(1) for the successful path.


def get_nth_page(self, session_id: str, page_index: int) -> dict[str, Any]:
"""
Retrieves the n-th page of a paginated response from the previous query.

:param session_id: Unique identifier for the session
:param page_index: Zero-based index, e.g., 0 for the first page
:return: On success, dictionary containing paged log entries and pagination metadata.
On error, dictionary with ``{"Error": "error message describing the failure"}``.
"""
session = self.get_or_create_session(session_id)
if session.ran_instructions is False:
return {
"Error": "Please call get_instructions() first"
"to understand how to use this MCP server."
}

page_number = page_index + 1 # Convert zero-based to one-based
return session.get_page_data(page_number)
22 changes: 12 additions & 10 deletions components/clp-mcp-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ authors = [{name = "YScope Inc.", email = "dev@yscope.com"}]
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"aiomysql>=0.2.0",
"click>=8.3.0",
"fastmcp>=2.12.4",
"pymongo>=4.15.1",
"aiomysql>=0.2.0",
"msgpack>=1.1.1",
"paginate>=0.5.7",
"pymongo>=4.15.1",
]

[project.scripts]
Expand All @@ -23,10 +24,11 @@ build-backend = "hatchling.build"
[dependency-groups]
dev = [
"mypy>=1.16.0",
"ruff>=0.11.12",
"pytest>=8.4.1",
"pytest-env>=1.1.5",
"pytest-asyncio>=1.2.0",
"pytest-env>=1.1.5",
"pytest-repeat>=0.9.4",
"ruff>=0.11.12",
]

[tool.hatch.metadata]
Expand Down Expand Up @@ -69,12 +71,12 @@ ignore = [
]
isort.order-by-type = false

[tool.ruff.lint.per-file-ignores]
"tests/**" = [
"S101", # Allow usage of pytest `assert`
"TC003", # Ignore performance overhead of imports only used for type checking
]

[tool.ruff.format]
docstring-code-format = true
docstring-code-line-length = 100

[tool.ruff.lint.per-file-ignores]
"tests/test_clp_connector.py" = [
"INP001", # Allow implicit namespace package for tests
"S101", # Allow "assert" in test cases
]
1 change: 1 addition & 0 deletions components/clp-mcp-server/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test Package for CLP MCP Server."""
Loading
Loading