Skip to content

feat: add some grpc utils #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.22.7

require (
dario.cat/mergo v1.0.1
github.com/ClickHouse/ch-go v0.65.1
github.com/ClickHouse/clickhouse-go/v2 v2.32.2
github.com/IBM/sarama v1.45.1
github.com/ThreeDotsLabs/watermill v1.4.4
Expand Down Expand Up @@ -70,12 +71,12 @@ require (
go.uber.org/dig v1.18.1
go.uber.org/fx v1.23.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.71.0
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/ClickHouse/ch-go v0.65.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/ajg/form v1.5.1 // indirect
Expand Down Expand Up @@ -209,7 +210,6 @@ require (
golang.org/x/tools v0.30.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
106 changes: 106 additions & 0 deletions grpcserver/serverport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package grpcserver

import (
"context"

"github.com/formancehq/go-libs/v3/logging"
"github.com/formancehq/go-libs/v3/serverport"
"go.uber.org/fx"
"google.golang.org/grpc"
)

const serverPortDiscr = "grpc"

func ContextWithServerInfo(ctx context.Context) context.Context {
return serverport.ContextWithServerInfo(ctx, serverPortDiscr)

Check warning on line 15 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L14-L15

Added lines #L14 - L15 were not covered by tests
}

func startServer(ctx context.Context, s *serverport.Server, serverOptions []grpc.ServerOption, setupOptions []func(*grpc.Server)) (func(ctx context.Context) error, error) {

if err := s.Listen(ctx); err != nil {
return nil, err
}

Check warning on line 22 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L18-L22

Added lines #L18 - L22 were not covered by tests

grpcServer := grpc.NewServer(serverOptions...)
for _, option := range setupOptions {
option(grpcServer)
}
go func() {
if err := grpcServer.Serve(s.Listener); err != nil {
logging.FromContext(ctx).Errorf("failed to serve: %v", err)
}

Check warning on line 31 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L24-L31

Added lines #L24 - L31 were not covered by tests
}()
Comment on lines +28 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Surface serving errors – align behaviour with the HTTP implementation

grpcServer.Serve can exit immediately (e.g. port already in use, TLS mis‑configuration).
In the HTTP starter we panic on such unrecoverable conditions, but here we only log, allowing the application to keep running in a half‑initialised state.

 go func() {
-    if err := grpcServer.Serve(s.Listener); err != nil {
-        logging.FromContext(ctx).Errorf("failed to serve: %v", err)
+    if err := grpcServer.Serve(s.Listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
+        panic(fmt.Errorf("GRPC server failed to serve: %w", err))
     }
 }()

Doing so keeps both stacks consistent and fails fast when the port cannot be bound.
(Requires the errors and fmt imports.)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func() {
if err := grpcServer.Serve(s.Listener); err != nil {
logging.FromContext(ctx).Errorf("failed to serve: %v", err)
}
}()
go func() {
if err := grpcServer.Serve(s.Listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
panic(fmt.Errorf("GRPC server failed to serve: %w", err))
}
}()


return func(ctx context.Context) error {
grpcServer.GracefulStop()

return nil
}, nil

Check warning on line 38 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L34-L38

Added lines #L34 - L38 were not covered by tests
}

func Address(ctx context.Context) string {
return serverport.Address(ctx, serverPortDiscr)

Check warning on line 42 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

type ServerOptions struct {
serverPortOptions []serverport.ServerOpts
grpcServerOpts []grpc.ServerOption
grpcSetupOpts []func(server *grpc.Server)
}

type ServerOptionModifier func(server *ServerOptions)

func WithServerPortOptions(opts ...serverport.ServerOpts) ServerOptionModifier {
return func(serverOptions *ServerOptions) {
serverOptions.serverPortOptions = append(serverOptions.serverPortOptions, opts...)
}

Check warning on line 56 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L53-L56

Added lines #L53 - L56 were not covered by tests
}

func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOptionModifier {
return func(serverOptions *ServerOptions) {
serverOptions.grpcServerOpts = append(serverOptions.grpcServerOpts, opts...)
}

Check warning on line 62 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}

func WithGRPCSetupOptions(opts ...func(server *grpc.Server)) ServerOptionModifier {
return func(serverOptions *ServerOptions) {
serverOptions.grpcSetupOpts = append(serverOptions.grpcSetupOpts, opts...)
}

Check warning on line 68 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L65-L68

Added lines #L65 - L68 were not covered by tests
}

func NewHook(serverOptionsModifiers ...ServerOptionModifier) fx.Hook {
var (
close func(ctx context.Context) error
err error
)

options := &ServerOptions{}
for _, option := range serverOptionsModifiers {
option(options)
}

Check warning on line 80 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L71-L80

Added lines #L71 - L80 were not covered by tests

server := serverport.NewServer(serverPortDiscr, options.serverPortOptions...)

return fx.Hook{
OnStart: func(ctx context.Context) error {
logging.FromContext(ctx).Infof("starting GRPC server")
close, err = startServer(
ctx,
server,
options.grpcServerOpts,
options.grpcSetupOpts,
)
return err
},
OnStop: func(ctx context.Context) error {
if close == nil {
return nil
}
logging.FromContext(ctx).Infof("Stop GRPC server")
defer func() {
logging.FromContext(ctx).Infof("GRPC server stopped")
}()
return close(ctx)

Check warning on line 103 in grpcserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

grpcserver/serverport.go#L82-L103

Added lines #L82 - L103 were not covered by tests
},
}
}
112 changes: 33 additions & 79 deletions httpserver/serverport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,34 @@

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"

"github.com/formancehq/go-libs/v3/serverport"

"github.com/formancehq/go-libs/v3/logging"

"go.uber.org/fx"
)

type serverInfo struct {
started chan struct{}
address string
}

type serverInfoContextKey string

var serverInfoKey serverInfoContextKey = "_serverInfo"

func GetActualServerInfo(ctx context.Context) *serverInfo {
siAsAny := ctx.Value(serverInfoKey)
if siAsAny == nil {
return nil
}
return siAsAny.(*serverInfo)
}

func ContextWithServerInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, serverInfoKey, &serverInfo{
started: make(chan struct{}),
})
return serverport.ContextWithServerInfo(ctx, serverPortDiscr)

Check warning on line 18 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L18

Added line #L18 was not covered by tests
}

func Started(ctx context.Context) chan struct{} {
si := GetActualServerInfo(ctx)
if si == nil {
return nil
}
return si.started
}

func Address(ctx context.Context) string {
si := GetActualServerInfo(ctx)
if si == nil {
return ""
}
return si.address
}
const serverPortDiscr = "http"

func URL(ctx context.Context) string {
return fmt.Sprintf("http://%s", Address(ctx))
return fmt.Sprintf("http://%s", serverport.Address(ctx, serverPortDiscr))

Check warning on line 24 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L24

Added line #L24 was not covered by tests
}

func StartedServer(ctx context.Context, listener net.Listener) {
si := GetActualServerInfo(ctx)
if si == nil {
return
}
func startServer(ctx context.Context, s *serverport.Server, handler http.Handler, options ...func(server *http.Server)) (func(ctx context.Context) error, error) {

Check warning on line 27 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L27

Added line #L27 was not covered by tests

si.address = listener.Addr().String()

close(si.started)
}

func (s *server) StartServer(ctx context.Context, handler http.Handler, options ...func(server *http.Server)) (func(ctx context.Context) error, error) {

if s.listener == nil {
if s.address == "" {
return nil, errors.New("either address or listener must be provided")
}
listener, err := net.Listen("tcp", s.address)
if err != nil {
return nil, err
}
s.listener = listener
if err := s.Listen(ctx); err != nil {
return nil, err

Check warning on line 30 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L29-L30

Added lines #L29 - L30 were not covered by tests
}

StartedServer(ctx, s.listener)

srv := &http.Server{
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
Expand All @@ -91,7 +39,7 @@
}

go func() {
err := srv.Serve(s.listener)
err := srv.Serve(s.Listener)

Check warning on line 42 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L42

Added line #L42 was not covered by tests
if err != nil && err != http.ErrServerClosed {
panic(err)
}
Expand All @@ -102,41 +50,47 @@
}, nil
}

type server struct {
listener net.Listener
address string
type ServerOptions struct {
serverOptions []serverport.ServerOpts
httpServerOpts []func(server *http.Server)
}

type serverOpts func(server *server)
type ServerOptionModifier func(server *ServerOptions)

func WithListener(listener net.Listener) serverOpts {
return func(server *server) {
server.listener = listener
func WithServerPortOptions(opts ...serverport.ServerOpts) ServerOptionModifier {
return func(serverOptions *ServerOptions) {
serverOptions.serverOptions = append(serverOptions.serverOptions, opts...)

Check warning on line 62 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L60-L62

Added lines #L60 - L62 were not covered by tests
}
}

func WithAddress(addr string) serverOpts {
return func(server *server) {
server.address = addr
}
func WithListener(listener net.Listener) ServerOptionModifier {
return WithServerPortOptions(serverport.WithListener(listener))

Check warning on line 67 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}

func WithAddress(addr string) ServerOptionModifier {
return WithServerPortOptions(serverport.WithAddress(addr))

Check warning on line 71 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}

func WithHttpServerOpts(opts ...func(server *http.Server)) serverOpts {
return func(server *server) {
func WithHttpServerOpts(opts ...func(server *http.Server)) ServerOptionModifier {
return func(server *ServerOptions) {

Check warning on line 75 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L74-L75

Added lines #L74 - L75 were not covered by tests
server.httpServerOpts = opts
}
Comment on lines +74 to 77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Functional‑option modifier overrides previous values instead of appending

WithHttpServerOpts replaces the slice each time it’s used, unlike the other modifiers that append.
Multiple call sites will silently lose options.

-return func(server *ServerOptions) {
-    server.httpServerOpts = opts
+return func(server *ServerOptions) {
+    server.httpServerOpts = append(server.httpServerOpts, opts...)
 }

Unit tests configuring several timeouts/handlers would currently apply only the last one.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 74-75: httpserver/serverport.go#L74-L75
Added lines #L74 - L75 were not covered by tests

}

func NewHook(handler http.Handler, options ...serverOpts) fx.Hook {
func NewHook(handler http.Handler, serverOptionModifiers ...ServerOptionModifier) fx.Hook {

Check warning on line 80 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L80

Added line #L80 was not covered by tests
var (
close func(ctx context.Context) error
err error
)

s := &server{}
for _, option := range options {
option(s)
serverOptions := &ServerOptions{}
for _, serverOptionModifier := range serverOptionModifiers {
serverOptionModifier(serverOptions)
}

Check warning on line 89 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L86-L89

Added lines #L86 - L89 were not covered by tests

server := serverport.NewServer(serverPortDiscr)
for _, option := range serverOptions.serverOptions {
option(server)

Check warning on line 93 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L91-L93

Added lines #L91 - L93 were not covered by tests
}
Comment on lines +91 to 94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pass accumulated server‑port options directly when constructing the server

You already gather serverOptions.serverOptions; pass them to the constructor like the gRPC implementation and drop the follow‑up mutation loop for simplicity & immutability.

-server := serverport.NewServer(serverPortDiscr)
-for _, option := range serverOptions.serverOptions {
-    option(server)
-}
+server := serverport.NewServer(serverPortDiscr, serverOptions.serverOptions...)

Reduces cognitive load and the risk of future divergence between HTTP and gRPC helpers.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
server := serverport.NewServer(serverPortDiscr)
for _, option := range serverOptions.serverOptions {
option(server)
}
server := serverport.NewServer(serverPortDiscr, serverOptions.serverOptions...)
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 91-93: httpserver/serverport.go#L91-L93
Added lines #L91 - L93 were not covered by tests


return fx.Hook{
Expand All @@ -145,7 +99,7 @@
defer func() {
logging.FromContext(ctx).Infof("HTTP server started")
}()
close, err = s.StartServer(ctx, handler, s.httpServerOpts...)
close, err = startServer(ctx, server, handler, serverOptions.httpServerOpts...)

Check warning on line 102 in httpserver/serverport.go

View check run for this annotation

Codecov / codecov/patch

httpserver/serverport.go#L102

Added line #L102 was not covered by tests
return err
},
OnStop: func(ctx context.Context) error {
Expand Down
Loading