Skip to content

Commit 9d19462

Browse files
committed
add mcp server with find user tool
1 parent 167c8d9 commit 9d19462

File tree

5 files changed

+637
-4
lines changed

5 files changed

+637
-4
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import { randomUUID } from 'node:crypto'
2+
import { type Transport } from '@modelcontextprotocol/sdk/shared/transport.js'
3+
import {
4+
type JSONRPCMessage,
5+
JSONRPCMessageSchema,
6+
} from '@modelcontextprotocol/sdk/types.js'
7+
8+
/**
9+
* Server transport for SSE using Standard Fetch: this will send messages over
10+
* an SSE connection and receive messages from HTTP POST requests.
11+
*/
12+
export class FetchSSEServerTransport implements Transport {
13+
#stream?: ReadableStreamDefaultController<string>
14+
#sessionId: string
15+
#endpoint: string
16+
17+
onclose?: () => void
18+
onerror?: (error: Error) => void
19+
onmessage?: (message: JSONRPCMessage) => void
20+
21+
/**
22+
* Creates a new SSE server transport, which will direct the client to POST
23+
* messages to the relative or absolute URL identified by `endpoint`.
24+
*/
25+
constructor(endpoint: string, sessionId?: string | null) {
26+
this.#endpoint = endpoint
27+
this.#sessionId = sessionId ?? randomUUID()
28+
}
29+
30+
/**
31+
* Starts processing messages on the transport.
32+
* This is called by the Server class and should not be called directly.
33+
*/
34+
async start(): Promise<void> {
35+
if (this.#stream) {
36+
throw new Error(
37+
'FetchSSEServerTransport already started! If using Server class, note that connect() calls start() automatically.',
38+
)
39+
}
40+
}
41+
42+
/**
43+
* Handles the initial SSE connection request.
44+
* This should be called from your Remix loader to establish the SSE stream.
45+
*/
46+
async handleSSERequest(request: Request): Promise<Response> {
47+
const stream = new ReadableStream<string>({
48+
start: (controller) => {
49+
this.#stream = controller
50+
51+
// Send headers
52+
controller.enqueue(': ping\n\n') // Keep connection alive
53+
54+
// Send the endpoint event
55+
controller.enqueue(
56+
`event: endpoint\ndata: ${encodeURI(
57+
this.#endpoint,
58+
)}?sessionId=${this.#sessionId}\n\n`,
59+
)
60+
61+
// Handle cleanup when the connection closes
62+
request.signal.addEventListener('abort', () => {
63+
controller.close()
64+
this.#stream = undefined
65+
this.onclose?.()
66+
})
67+
},
68+
cancel: () => {
69+
this.#stream = undefined
70+
this.onclose?.()
71+
},
72+
})
73+
74+
return new Response(stream, {
75+
headers: {
76+
'Content-Type': 'text/event-stream',
77+
'Cache-Control': 'no-cache',
78+
Connection: 'keep-alive',
79+
'Mcp-Session-Id': this.#sessionId,
80+
},
81+
})
82+
}
83+
84+
/**
85+
* Handles incoming POST messages.
86+
* This should be called from your Remix action to handle incoming messages.
87+
*/
88+
async handlePostMessage(request: Request): Promise<Response> {
89+
if (!this.#stream) {
90+
const message = 'SSE connection not established'
91+
return new Response(message, { status: 500 })
92+
}
93+
94+
let body: unknown
95+
try {
96+
const contentType = request.headers.get('content-type')
97+
if (contentType !== 'application/json') {
98+
throw new Error(`Unsupported content-type: ${contentType}`)
99+
}
100+
101+
body = await request.json()
102+
} catch (error) {
103+
this.onerror?.(error as Error)
104+
return new Response(String(error), { status: 400 })
105+
}
106+
107+
try {
108+
await this.handleMessage(body)
109+
} catch (error) {
110+
console.error(error)
111+
return new Response(`Invalid message: ${body}`, { status: 400 })
112+
}
113+
114+
return new Response('Accepted', { status: 202 })
115+
}
116+
117+
/**
118+
* Handle a client message, regardless of how it arrived.
119+
*/
120+
async handleMessage(message: unknown): Promise<void> {
121+
let parsedMessage: JSONRPCMessage
122+
try {
123+
parsedMessage = JSONRPCMessageSchema.parse(message)
124+
} catch (error) {
125+
this.onerror?.(error as Error)
126+
throw error
127+
}
128+
129+
this.onmessage?.(parsedMessage)
130+
}
131+
132+
async close(): Promise<void> {
133+
this.#stream?.close()
134+
this.#stream = undefined
135+
this.onclose?.()
136+
}
137+
138+
async send(message: JSONRPCMessage): Promise<void> {
139+
if (!this.#stream) {
140+
throw new Error('Not connected')
141+
}
142+
143+
// Send the message through the event stream
144+
this.#stream.enqueue(`event: message\ndata: ${JSON.stringify(message)}\n\n`)
145+
}
146+
147+
/**
148+
* Returns the session ID for this transport.
149+
* This can be used to route incoming POST requests.
150+
*/
151+
get sessionId(): string {
152+
return this.#sessionId
153+
}
154+
}

app/routes/mcp+/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { type Route } from './+types/index.ts'
2+
import { connect, getTransport } from './mcp.server.ts'
3+
4+
export async function loader({ request }: Route.LoaderArgs) {
5+
const url = new URL(request.url)
6+
const sessionId = url.searchParams.get('sessionId')
7+
const transport = await connect(sessionId)
8+
return transport.handleSSERequest(request)
9+
}
10+
11+
export async function action({ request }: Route.ActionArgs) {
12+
const url = new URL(request.url)
13+
const sessionId = url.searchParams.get('sessionId')
14+
if (!sessionId) {
15+
return new Response('No session ID', { status: 400 })
16+
}
17+
const transport = await getTransport(sessionId)
18+
if (!transport) {
19+
return new Response('Not found', { status: 404 })
20+
}
21+
return transport.handlePostMessage(request)
22+
}

app/routes/mcp+/mcp.server.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
2+
import { type CallToolResult } from '@modelcontextprotocol/sdk/types.js'
3+
import { searchUsers } from '@prisma/client/sql'
4+
import { z } from 'zod'
5+
import { prisma } from '#app/utils/db.server.ts'
6+
import { getSignedGetRequestInfo } from '#app/utils/storage.server.ts'
7+
import { FetchSSEServerTransport } from './fetch-transport.server.ts'
8+
9+
export const server = new McpServer(
10+
{
11+
name: 'epic-mcp-a25d',
12+
version: '1.0.0',
13+
},
14+
{
15+
capabilities: {
16+
tools: {},
17+
},
18+
},
19+
)
20+
21+
server.tool(
22+
'Find User',
23+
{ query: z.string().describe('The query to search for') },
24+
async ({ query }) => {
25+
const like = `%${query ?? ''}%`
26+
const users = await prisma.$queryRawTyped(searchUsers(like))
27+
28+
const content: CallToolResult['content'] = []
29+
for (const user of users) {
30+
content.push({
31+
type: 'text',
32+
text: `${user.name} (${user.username})`,
33+
})
34+
35+
if (user.imageObjectKey) {
36+
content.push({
37+
type: 'image',
38+
data: await getUserBase64Image(user.imageObjectKey),
39+
mimeType: 'image/png',
40+
})
41+
}
42+
}
43+
44+
return { content }
45+
},
46+
)
47+
48+
async function getUserBase64Image(imageObjectKey: string) {
49+
const { url: signedUrl, headers: signedHeaders } =
50+
getSignedGetRequestInfo(imageObjectKey)
51+
const response = await fetch(signedUrl, { headers: signedHeaders })
52+
const blob = await response.blob()
53+
const buffer = await blob.arrayBuffer()
54+
return Buffer.from(buffer).toString('base64')
55+
}
56+
57+
const transports = new Map<string, FetchSSEServerTransport>()
58+
59+
export async function connect(sessionId?: string | null) {
60+
const transport = new FetchSSEServerTransport('/mcp', sessionId)
61+
transport.onclose = () => {
62+
transports.delete(transport.sessionId)
63+
}
64+
await server.connect(transport)
65+
transports.set(transport.sessionId, transport)
66+
return transport
67+
}
68+
69+
export async function getTransport(sessionId: string) {
70+
return transports.get(sessionId)
71+
}

0 commit comments

Comments
 (0)