-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add some grpc utils #403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package grpcserver | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/formancehq/go-libs/v3/logging" | ||
"github.com/formancehq/go-libs/v3/serverport" | ||
"go.uber.org/fx" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
const serverPortDiscr = "grpc" | ||
|
||
func ContextWithServerInfo(ctx context.Context) context.Context { | ||
return serverport.ContextWithServerInfo(ctx, serverPortDiscr) | ||
} | ||
|
||
func startServer(ctx context.Context, s *serverport.Server, serverOptions []grpc.ServerOption, setupOptions []func(*grpc.Server)) (func(ctx context.Context) error, error) { | ||
|
||
if err := s.Listen(ctx); err != nil { | ||
return nil, err | ||
} | ||
|
||
grpcServer := grpc.NewServer(serverOptions...) | ||
for _, option := range setupOptions { | ||
option(grpcServer) | ||
} | ||
go func() { | ||
if err := grpcServer.Serve(s.Listener); err != nil { | ||
logging.FromContext(ctx).Errorf("failed to serve: %v", err) | ||
} | ||
}() | ||
|
||
return func(ctx context.Context) error { | ||
grpcServer.GracefulStop() | ||
|
||
return nil | ||
}, nil | ||
} | ||
|
||
func Address(ctx context.Context) string { | ||
return serverport.Address(ctx, serverPortDiscr) | ||
} | ||
|
||
type ServerOptions struct { | ||
serverPortOptions []serverport.ServerOpts | ||
grpcServerOpts []grpc.ServerOption | ||
grpcSetupOpts []func(server *grpc.Server) | ||
} | ||
|
||
type ServerOptionModifier func(server *ServerOptions) | ||
|
||
func WithServerPortOptions(opts ...serverport.ServerOpts) ServerOptionModifier { | ||
return func(serverOptions *ServerOptions) { | ||
serverOptions.serverPortOptions = append(serverOptions.serverPortOptions, opts...) | ||
} | ||
} | ||
|
||
func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOptionModifier { | ||
return func(serverOptions *ServerOptions) { | ||
serverOptions.grpcServerOpts = append(serverOptions.grpcServerOpts, opts...) | ||
} | ||
} | ||
|
||
func WithGRPCSetupOptions(opts ...func(server *grpc.Server)) ServerOptionModifier { | ||
return func(serverOptions *ServerOptions) { | ||
serverOptions.grpcSetupOpts = append(serverOptions.grpcSetupOpts, opts...) | ||
} | ||
} | ||
|
||
func NewHook(serverOptionsModifiers ...ServerOptionModifier) fx.Hook { | ||
var ( | ||
close func(ctx context.Context) error | ||
err error | ||
) | ||
|
||
options := &ServerOptions{} | ||
for _, option := range serverOptionsModifiers { | ||
option(options) | ||
} | ||
|
||
server := serverport.NewServer(serverPortDiscr, options.serverPortOptions...) | ||
|
||
return fx.Hook{ | ||
OnStart: func(ctx context.Context) error { | ||
logging.FromContext(ctx).Infof("starting GRPC server") | ||
close, err = startServer( | ||
ctx, | ||
server, | ||
options.grpcServerOpts, | ||
options.grpcSetupOpts, | ||
) | ||
return err | ||
}, | ||
OnStop: func(ctx context.Context) error { | ||
if close == nil { | ||
return nil | ||
} | ||
logging.FromContext(ctx).Infof("Stop GRPC server") | ||
defer func() { | ||
logging.FromContext(ctx).Infof("GRPC server stopped") | ||
}() | ||
return close(ctx) | ||
}, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,86 +2,34 @@ | |||||||||||
|
||||||||||||
import ( | ||||||||||||
"context" | ||||||||||||
"errors" | ||||||||||||
"fmt" | ||||||||||||
"net" | ||||||||||||
"net/http" | ||||||||||||
"time" | ||||||||||||
|
||||||||||||
"github.com/formancehq/go-libs/v3/serverport" | ||||||||||||
|
||||||||||||
"github.com/formancehq/go-libs/v3/logging" | ||||||||||||
|
||||||||||||
"go.uber.org/fx" | ||||||||||||
) | ||||||||||||
|
||||||||||||
type serverInfo struct { | ||||||||||||
started chan struct{} | ||||||||||||
address string | ||||||||||||
} | ||||||||||||
|
||||||||||||
type serverInfoContextKey string | ||||||||||||
|
||||||||||||
var serverInfoKey serverInfoContextKey = "_serverInfo" | ||||||||||||
|
||||||||||||
func GetActualServerInfo(ctx context.Context) *serverInfo { | ||||||||||||
siAsAny := ctx.Value(serverInfoKey) | ||||||||||||
if siAsAny == nil { | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
return siAsAny.(*serverInfo) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func ContextWithServerInfo(ctx context.Context) context.Context { | ||||||||||||
return context.WithValue(ctx, serverInfoKey, &serverInfo{ | ||||||||||||
started: make(chan struct{}), | ||||||||||||
}) | ||||||||||||
return serverport.ContextWithServerInfo(ctx, serverPortDiscr) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func Started(ctx context.Context) chan struct{} { | ||||||||||||
si := GetActualServerInfo(ctx) | ||||||||||||
if si == nil { | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
return si.started | ||||||||||||
} | ||||||||||||
|
||||||||||||
func Address(ctx context.Context) string { | ||||||||||||
si := GetActualServerInfo(ctx) | ||||||||||||
if si == nil { | ||||||||||||
return "" | ||||||||||||
} | ||||||||||||
return si.address | ||||||||||||
} | ||||||||||||
const serverPortDiscr = "http" | ||||||||||||
|
||||||||||||
func URL(ctx context.Context) string { | ||||||||||||
return fmt.Sprintf("http://%s", Address(ctx)) | ||||||||||||
return fmt.Sprintf("http://%s", serverport.Address(ctx, serverPortDiscr)) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func StartedServer(ctx context.Context, listener net.Listener) { | ||||||||||||
si := GetActualServerInfo(ctx) | ||||||||||||
if si == nil { | ||||||||||||
return | ||||||||||||
} | ||||||||||||
func startServer(ctx context.Context, s *serverport.Server, handler http.Handler, options ...func(server *http.Server)) (func(ctx context.Context) error, error) { | ||||||||||||
|
||||||||||||
si.address = listener.Addr().String() | ||||||||||||
|
||||||||||||
close(si.started) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (s *server) StartServer(ctx context.Context, handler http.Handler, options ...func(server *http.Server)) (func(ctx context.Context) error, error) { | ||||||||||||
|
||||||||||||
if s.listener == nil { | ||||||||||||
if s.address == "" { | ||||||||||||
return nil, errors.New("either address or listener must be provided") | ||||||||||||
} | ||||||||||||
listener, err := net.Listen("tcp", s.address) | ||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
s.listener = listener | ||||||||||||
if err := s.Listen(ctx); err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
|
||||||||||||
StartedServer(ctx, s.listener) | ||||||||||||
|
||||||||||||
srv := &http.Server{ | ||||||||||||
Handler: handler, | ||||||||||||
ReadHeaderTimeout: 10 * time.Second, | ||||||||||||
|
@@ -91,7 +39,7 @@ | |||||||||||
} | ||||||||||||
|
||||||||||||
go func() { | ||||||||||||
err := srv.Serve(s.listener) | ||||||||||||
err := srv.Serve(s.Listener) | ||||||||||||
if err != nil && err != http.ErrServerClosed { | ||||||||||||
panic(err) | ||||||||||||
} | ||||||||||||
|
@@ -102,41 +50,47 @@ | |||||||||||
}, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
type server struct { | ||||||||||||
listener net.Listener | ||||||||||||
address string | ||||||||||||
type ServerOptions struct { | ||||||||||||
serverOptions []serverport.ServerOpts | ||||||||||||
httpServerOpts []func(server *http.Server) | ||||||||||||
} | ||||||||||||
|
||||||||||||
type serverOpts func(server *server) | ||||||||||||
type ServerOptionModifier func(server *ServerOptions) | ||||||||||||
|
||||||||||||
func WithListener(listener net.Listener) serverOpts { | ||||||||||||
return func(server *server) { | ||||||||||||
server.listener = listener | ||||||||||||
func WithServerPortOptions(opts ...serverport.ServerOpts) ServerOptionModifier { | ||||||||||||
return func(serverOptions *ServerOptions) { | ||||||||||||
serverOptions.serverOptions = append(serverOptions.serverOptions, opts...) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
func WithAddress(addr string) serverOpts { | ||||||||||||
return func(server *server) { | ||||||||||||
server.address = addr | ||||||||||||
} | ||||||||||||
func WithListener(listener net.Listener) ServerOptionModifier { | ||||||||||||
return WithServerPortOptions(serverport.WithListener(listener)) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func WithAddress(addr string) ServerOptionModifier { | ||||||||||||
return WithServerPortOptions(serverport.WithAddress(addr)) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func WithHttpServerOpts(opts ...func(server *http.Server)) serverOpts { | ||||||||||||
return func(server *server) { | ||||||||||||
func WithHttpServerOpts(opts ...func(server *http.Server)) ServerOptionModifier { | ||||||||||||
return func(server *ServerOptions) { | ||||||||||||
server.httpServerOpts = opts | ||||||||||||
} | ||||||||||||
Comment on lines
+74
to
77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Functional‑option modifier overrides previous values instead of appending
-return func(server *ServerOptions) {
- server.httpServerOpts = opts
+return func(server *ServerOptions) {
+ server.httpServerOpts = append(server.httpServerOpts, opts...)
} Unit tests configuring several timeouts/handlers would currently apply only the last one. 🧰 Tools🪛 GitHub Check: codecov/patch[warning] 74-75: httpserver/serverport.go#L74-L75 |
||||||||||||
} | ||||||||||||
|
||||||||||||
func NewHook(handler http.Handler, options ...serverOpts) fx.Hook { | ||||||||||||
func NewHook(handler http.Handler, serverOptionModifiers ...ServerOptionModifier) fx.Hook { | ||||||||||||
var ( | ||||||||||||
close func(ctx context.Context) error | ||||||||||||
err error | ||||||||||||
) | ||||||||||||
|
||||||||||||
s := &server{} | ||||||||||||
for _, option := range options { | ||||||||||||
option(s) | ||||||||||||
serverOptions := &ServerOptions{} | ||||||||||||
for _, serverOptionModifier := range serverOptionModifiers { | ||||||||||||
serverOptionModifier(serverOptions) | ||||||||||||
} | ||||||||||||
|
||||||||||||
server := serverport.NewServer(serverPortDiscr) | ||||||||||||
for _, option := range serverOptions.serverOptions { | ||||||||||||
option(server) | ||||||||||||
} | ||||||||||||
Comment on lines
+91
to
94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Pass accumulated server‑port options directly when constructing the server You already gather -server := serverport.NewServer(serverPortDiscr)
-for _, option := range serverOptions.serverOptions {
- option(server)
-}
+server := serverport.NewServer(serverPortDiscr, serverOptions.serverOptions...) Reduces cognitive load and the risk of future divergence between HTTP and gRPC helpers. 📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Check: codecov/patch[warning] 91-93: httpserver/serverport.go#L91-L93 |
||||||||||||
|
||||||||||||
return fx.Hook{ | ||||||||||||
|
@@ -145,7 +99,7 @@ | |||||||||||
defer func() { | ||||||||||||
logging.FromContext(ctx).Infof("HTTP server started") | ||||||||||||
}() | ||||||||||||
close, err = s.StartServer(ctx, handler, s.httpServerOpts...) | ||||||||||||
close, err = startServer(ctx, server, handler, serverOptions.httpServerOpts...) | ||||||||||||
return err | ||||||||||||
}, | ||||||||||||
OnStop: func(ctx context.Context) error { | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Surface serving errors – align behaviour with the HTTP implementation
grpcServer.Serve
can exit immediately (e.g. port already in use, TLS mis‑configuration).In the HTTP starter we
panic
on such unrecoverable conditions, but here we only log, allowing the application to keep running in a half‑initialised state.Doing so keeps both stacks consistent and fails fast when the port cannot be bound.
(Requires the
errors
andfmt
imports.)📝 Committable suggestion