Skip to content

Commit ee3a197

Browse files
authored
add jsonrpc serialization to rpc (#4623)
1 parent a07dab2 commit ee3a197

File tree

11 files changed

+460
-65
lines changed

11 files changed

+460
-65
lines changed

.changeset/blue-files-stare.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/rpc": patch
3+
---
4+
5+
add RpcSerialization.jsonRpc & RpcSerialization.ndJsonRpc

.changeset/pink-games-peel.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/rpc": patch
3+
---
4+
5+
support primitives in rpc payloads

packages/cluster/src/EntityProxy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export const toRpcGroup = <Rpcs extends Rpc.Any, const Prefix extends string = "
6363
payloadSchema.make = (input: any, options?: Schema.MakeOptions) => {
6464
return oldMake({
6565
entityId: input.entityId,
66-
payload: parentRpc.payloadSchema.make(input.payload, options)
66+
payload: parentRpc.payloadSchema.make ? parentRpc.payloadSchema.make(input.payload, options) : input.payload
6767
}, options)
6868
}
6969
const rpc = Rpc.make(`${prefix}${parentRpc._tag}`, {

packages/cluster/src/ShardManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ export const makeClientRpc: Effect.Effect<
382382
unregister: (address) => client.Unregister({ address }),
383383
notifyUnhealthyRunner: (address) => client.NotifyUnhealthyRunner({ address }),
384384
getAssignments: client.GetAssignments(),
385-
shardingEvents: client.ShardingEvents({}, { asMailbox: true }),
385+
shardingEvents: client.ShardingEvents(void 0, { asMailbox: true }),
386386
getTime: client.GetTime()
387387
})
388388
})

packages/cluster/test/Sharding.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ describe.concurrent("Sharding", () => {
479479
const driver = yield* MessageStorage.MemoryDriver
480480
const makeClient = yield* TestEntity.client
481481
const client = makeClient("1")
482-
const result = yield* client.Never({}, { discard: true })
482+
const result = yield* client.Never(void 0, { discard: true })
483483
expect(result).toEqual(void 0)
484484
yield* TestClock.adjust(1)
485485
expect(driver.journal.length).toEqual(1)

packages/platform-node/test/RpcServer.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ describe("RpcServer", () => {
3535
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
3636
)
3737
)
38+
e2eSuite(
39+
"e2e http jsonrpc",
40+
HttpNdjsonClient.pipe(
41+
Layer.provideMerge(HttpNdjsonServer),
42+
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerNdJsonRpc()])
43+
)
44+
)
3845

3946
// websocket
4047
const HttpWsServer = HttpRouter.Default.serve().pipe(
@@ -72,6 +79,13 @@ describe("RpcServer", () => {
7279
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
7380
)
7481
)
82+
e2eSuite(
83+
"e2e ws jsonrpc",
84+
HttpWsClient.pipe(
85+
Layer.provideMerge(HttpWsServer),
86+
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerJsonRpc()])
87+
)
88+
)
7589

7690
// tcp
7791
const TcpServer = RpcLive.pipe(
@@ -102,6 +116,13 @@ describe("RpcServer", () => {
102116
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
103117
)
104118
)
119+
e2eSuite(
120+
"e2e tcp jsonrpc",
121+
TcpClient.pipe(
122+
Layer.provideMerge(TcpServer),
123+
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerNdJsonRpc()])
124+
)
125+
)
105126

106127
// worker
107128
const WorkerClient = UsersClient.layer.pipe(

packages/rpc/src/Rpc.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export const isRpc = (u: unknown): u is Rpc<any, any, any> => Predicate.hasPrope
4545
*/
4646
export interface Rpc<
4747
in out Tag extends string,
48-
out Payload extends AnyStructSchema = Schema.Struct<{}>,
48+
out Payload extends AnySchema = typeof Schema.Void,
4949
out Success extends Schema.Schema.Any = typeof Schema.Void,
5050
out Error extends Schema.Schema.All = typeof Schema.Never,
5151
out Middleware extends RpcMiddleware.TagClassAny = never
@@ -154,7 +154,7 @@ export interface AnyWithProps {
154154
readonly [TypeId]: TypeId
155155
readonly _tag: string
156156
readonly key: string
157-
readonly payloadSchema: AnyStructSchema
157+
readonly payloadSchema: AnySchema
158158
readonly successSchema: Schema.Schema.Any
159159
readonly errorSchema: Schema.Schema.All
160160
readonly annotations: Context_.Context<never>
@@ -287,10 +287,8 @@ export type PayloadConstructor<R> = R extends Rpc<
287287
infer _Success,
288288
infer _Error,
289289
infer _Middleware
290-
> ?
291-
Schema.Struct.Constructor<_Payload["fields"]> extends infer T ?
292-
[keyof T] extends [never] ? void | {} : Schema.Simplify<T>
293-
: never
290+
> ? _Payload extends { readonly make: (params: infer P, ...rest: infer _Rest) => infer _ } ? P
291+
: _Payload["Type"]
294292
: never
295293

296294
/**
@@ -548,7 +546,7 @@ const Proto = {
548546

549547
const makeProto = <
550548
const Tag extends string,
551-
Payload extends AnyStructSchema,
549+
Payload extends Schema.Schema.Any,
552550
Success extends Schema.Schema.Any,
553551
Error extends Schema.Schema.All,
554552
Middleware extends RpcMiddleware.TagClassAny
@@ -567,15 +565,13 @@ const makeProto = <
567565
return Rpc as any
568566
}
569567

570-
const constEmptyStruct = Schema.Struct({})
571-
572568
/**
573569
* @since 1.0.0
574570
* @category constructors
575571
*/
576572
export const make = <
577573
const Tag extends string,
578-
Payload extends AnyStructSchema | Schema.Struct.Fields = Schema.Struct<{}>,
574+
Payload extends Schema.Schema.Any | Schema.Struct.Fields = typeof Schema.Void,
579575
Success extends Schema.Schema.Any = typeof Schema.Void,
580576
Error extends Schema.Schema.All = typeof Schema.Never,
581577
const Stream extends boolean = false
@@ -607,7 +603,7 @@ export const make = <
607603
? options?.payload as any
608604
: options?.payload
609605
? Schema.Struct(options?.payload as any)
610-
: constEmptyStruct
606+
: Schema.Void
611607
}
612608
return makeProto({
613609
_tag: tag,
@@ -628,22 +624,21 @@ export const make = <
628624
* @since 1.0.0
629625
* @category constructors
630626
*/
631-
export interface AnyStructSchema extends Pipeable {
627+
export interface AnySchema extends Pipeable {
632628
readonly [Schema.TypeId]: any
633-
readonly make: any
634629
readonly Type: any
635630
readonly Encoded: any
636631
readonly Context: any
632+
readonly make?: (params: any, ...rest: ReadonlyArray<any>) => any
637633
readonly ast: AST.AST
638-
readonly fields: Schema.Struct.Fields
639634
readonly annotations: any
640635
}
641636

642637
/**
643638
* @since 1.0.0
644639
* @category constructors
645640
*/
646-
export interface AnyTaggedRequestSchema extends AnyStructSchema {
641+
export interface AnyTaggedRequestSchema extends AnySchema {
647642
readonly _tag: string
648643
readonly success: Schema.Schema.Any
649644
readonly failure: Schema.Schema.All

packages/rpc/src/RpcClient.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,15 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any, E>(
182182
const onRequest = (rpc: Rpc.AnyWithProps) => {
183183
const isStream = RpcSchema.isStreamSchema(rpc.successSchema)
184184
const middleware = getRpcClientMiddleware(rpc)
185-
return (payload: any, opts?: {
185+
return (payload_: any, opts?: {
186186
readonly asMailbox?: boolean | undefined
187187
readonly streamBufferSize?: number | undefined
188188
readonly headers?: Headers.Input | undefined
189189
readonly context?: Context.Context<never> | undefined
190190
readonly discard?: boolean | undefined
191191
}) => {
192192
const headers = opts?.headers ? Headers.fromInput(opts.headers) : Headers.empty
193+
const payload = payload_ ?? undefined
193194
if (!isStream) {
194195
const effect = Effect.useSpan(
195196
`${spanPrefix}.${rpc._tag}`,
@@ -199,7 +200,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any, E>(
199200
rpc,
200201
middleware,
201202
span,
202-
"make" in rpc.payloadSchema ? rpc.payloadSchema.make(payload ?? {}) : {},
203+
rpc.payloadSchema.make ? rpc.payloadSchema.make(payload) : payload,
203204
headers,
204205
opts?.context ?? Context.empty(),
205206
opts?.discard ?? false
@@ -211,7 +212,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any, E>(
211212
onStreamRequest(
212213
rpc,
213214
middleware,
214-
payload ? rpc.payloadSchema.make(payload) : {},
215+
rpc.payloadSchema.make ? rpc.payloadSchema.make(payload) : payload,
215216
headers,
216217
opts?.streamBufferSize ?? 16,
217218
opts?.context ?? Context.empty()
@@ -712,7 +713,7 @@ export const makeProtocolHttp = (client: HttpClient.HttpClient): Effect.Effect<
712713

713714
const parser = serialization.unsafeMake()
714715

715-
const encoded = parser.encode(request)
716+
const encoded = parser.encode(request)!
716717
const body = typeof encoded === "string" ?
717718
HttpBody.text(encoded, serialization.contentType) :
718719
HttpBody.uint8Array(encoded, serialization.contentType)
@@ -798,7 +799,7 @@ export const makeProtocolSocket = (options?: {
798799

799800
let parser = serialization.unsafeMake()
800801

801-
const pinger = yield* makePinger(write(parser.encode(constPing)))
802+
const pinger = yield* makePinger(write(parser.encode(constPing)!))
802803

803804
yield* Effect.suspend(() => {
804805
parser = serialization.unsafeMake()
@@ -861,7 +862,9 @@ export const makeProtocolSocket = (options?: {
861862

862863
return {
863864
send(request) {
864-
return Effect.orDie(write(parser.encode(request)))
865+
const encoded = parser.encode(request)
866+
if (encoded === undefined) return Effect.void
867+
return Effect.orDie(write(encoded))
865868
},
866869
supportsAck: true,
867870
supportsTransferables: false

0 commit comments

Comments
 (0)