From b90035a29d40b9500b81caf78c0e30ddb4008b0f Mon Sep 17 00:00:00 2001 From: Felix Gateru Date: Thu, 6 Mar 2025 15:50:26 +0300 Subject: [PATCH] feat: add audit logs Signed-off-by: Felix Gateru --- auditlogs/api/decode.go | 75 +++++++ auditlogs/api/endpoints.go | 50 +++++ auditlogs/api/requests.go | 14 ++ auditlogs/api/responses.go | 42 ++++ auditlogs/api/transport.go | 49 +++++ auditlogs/auditlogs.go | 71 +++++++ auditlogs/events/consumer.go | 143 +++++++++++++ auditlogs/middleware/authorization.go | 59 ++++++ auditlogs/middleware/logging.go | 77 +++++++ auditlogs/middleware/metrics.go | 55 +++++ auditlogs/mocks/repository.go | 106 ++++++++++ auditlogs/mocks/service.go | 107 ++++++++++ auditlogs/postgres/auditlogs.go | 277 ++++++++++++++++++++++++++ auditlogs/postgres/doc.go | 5 + auditlogs/postgres/init.go | 39 ++++ auditlogs/service.go | 57 ++++++ auditlogs/state.go | 112 +++++++++++ cmd/auditlogs/main.go | 210 +++++++++++++++++++ docker/.env | 17 ++ docker/docker-compose.yml | 56 ++++++ 20 files changed, 1621 insertions(+) create mode 100644 auditlogs/api/decode.go create mode 100644 auditlogs/api/endpoints.go create mode 100644 auditlogs/api/requests.go create mode 100644 auditlogs/api/responses.go create mode 100644 auditlogs/api/transport.go create mode 100644 auditlogs/auditlogs.go create mode 100644 auditlogs/events/consumer.go create mode 100644 auditlogs/middleware/authorization.go create mode 100644 auditlogs/middleware/logging.go create mode 100644 auditlogs/middleware/metrics.go create mode 100644 auditlogs/mocks/repository.go create mode 100644 auditlogs/mocks/service.go create mode 100644 auditlogs/postgres/auditlogs.go create mode 100644 auditlogs/postgres/doc.go create mode 100644 auditlogs/postgres/init.go create mode 100644 auditlogs/service.go create mode 100644 auditlogs/state.go create mode 100644 cmd/auditlogs/main.go diff --git a/auditlogs/api/decode.go b/auditlogs/api/decode.go new file mode 100644 index 0000000000..2c20cee705 --- /dev/null +++ b/auditlogs/api/decode.go @@ -0,0 +1,75 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + "net/http" + + api "github.com/absmach/supermq/api/http" + apiutil "github.com/absmach/supermq/api/http/util" + "github.com/absmach/supermq/auditlogs" + "github.com/go-chi/chi/v5" +) + +const ( + requestIDKey = "request_id" + actorIDKey = "actor_id" + entityTypeKey = "entity_type" + entityIDKey = "entity_id" +) + +func decodeRetrieveAuditLogReq(_ context.Context, r *http.Request) (interface{}, error) { + req := retriveAuditLogReq{ + id: chi.URLParam(r, "auditLogID"), + } + + return req, nil +} + +func decodeRetrieveAllAuditLogsReq(_ context.Context, r *http.Request) (interface{}, error) { + order, err := apiutil.ReadStringQuery(r, api.OrderKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + dir, err := apiutil.ReadStringQuery(r, api.DirKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + requestID, err := apiutil.ReadStringQuery(r, requestIDKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + actorID, err := apiutil.ReadStringQuery(r, actorIDKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + entityType, err := apiutil.ReadStringQuery(r, entityTypeKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + entityID, err := apiutil.ReadStringQuery(r, entityIDKey, "") + if err != nil { + return retrieveAllAuditLogsReq{}, err + } + + req := retrieveAllAuditLogsReq{ + Page: auditlogs.Page{ + Order: order, + Dir: dir, + RequestID: requestID, + ActorID: actorID, + EntityType: entityType, + EntityID: entityID, + }, + } + + return req, nil + +} diff --git a/auditlogs/api/endpoints.go b/auditlogs/api/endpoints.go new file mode 100644 index 0000000000..9170a4bac7 --- /dev/null +++ b/auditlogs/api/endpoints.go @@ -0,0 +1,50 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + + "github.com/absmach/supermq/auditlogs" + api "github.com/absmach/supermq/api/http" + "github.com/absmach/supermq/pkg/authn" + "github.com/go-kit/kit/endpoint" + svcerr "github.com/absmach/supermq/pkg/errors/service" +) + +func retrieveAuditLogEndpoint(svc auditlogs.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(retriveAuditLogReq) + + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthentication + } + + log, err := svc.RetrieveByID(ctx, session, req.id) + if err != nil { + return nil, err + } + + return retrieveAuditLogRes{log}, nil + } +} + +func retrieveAllAuditLogsEndpoint(svc auditlogs.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(retrieveAllAuditLogsReq) + + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthentication + } + + page, err := svc.RetrieveAll(ctx, session, req.Page) + if err != nil { + return nil, err + } + + return retrieveAllAuditLogsRes{page}, nil + } +} diff --git a/auditlogs/api/requests.go b/auditlogs/api/requests.go new file mode 100644 index 0000000000..ab52bd43dd --- /dev/null +++ b/auditlogs/api/requests.go @@ -0,0 +1,14 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import "github.com/absmach/supermq/auditlogs" + +type retriveAuditLogReq struct { + id string +} + +type retrieveAllAuditLogsReq struct { + auditlogs.Page +} diff --git a/auditlogs/api/responses.go b/auditlogs/api/responses.go new file mode 100644 index 0000000000..3c4c1a559b --- /dev/null +++ b/auditlogs/api/responses.go @@ -0,0 +1,42 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "net/http" + + "github.com/absmach/supermq/auditlogs" +) + +type retrieveAuditLogRes struct { + auditlogs.AuditLog +} + +func (res retrieveAuditLogRes) Code() int { + return http.StatusOK +} + +func (res retrieveAuditLogRes) Headers() map[string]string { + return map[string]string{} +} + +func (res retrieveAuditLogRes) Empty() bool { + return false +} + +type retrieveAllAuditLogsRes struct { + auditlogs.AuditLogPage +} + +func (res retrieveAllAuditLogsRes) Code() int { + return http.StatusOK +} + +func (res retrieveAllAuditLogsRes) Headers() map[string]string { + return map[string]string{} +} + +func (res retrieveAllAuditLogsRes) Empty() bool { + return false +} diff --git a/auditlogs/api/transport.go b/auditlogs/api/transport.go new file mode 100644 index 0000000000..0880884648 --- /dev/null +++ b/auditlogs/api/transport.go @@ -0,0 +1,49 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "log/slog" + + "github.com/absmach/supermq" + "github.com/absmach/supermq/auditlogs" + api "github.com/absmach/supermq/api/http" + apiutil "github.com/absmach/supermq/api/http/util" + "github.com/go-chi/chi/v5" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + smqauthn "github.com/absmach/supermq/pkg/authn" + kithttp "github.com/go-kit/kit/transport/http" +) + +// MakeHandler returns a HTTP handler for Channels API endpoints. +func MakeHandler(svc auditlogs.Service, authn smqauthn.Authentication, mux *chi.Mux, logger *slog.Logger, instanceID string, idp supermq.IDProvider) *chi.Mux { + opts := []kithttp.ServerOption{ + kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)), + } + + mux.Route("/auditlogs", func(r chi.Router) { + r.Use(api.AuthenticateMiddleware(authn, false)) + r.Use(api.RequestIDMiddleware(idp)) + + r.Get("/{id}", otelhttp.NewHandler(kithttp.NewServer( + retrieveAuditLogEndpoint(svc), + decodeRetrieveAuditLogReq, + api.EncodeResponse, + opts..., + ), "retrieve_audit_log").ServeHTTP) + + r.Get("/", otelhttp.NewHandler(kithttp.NewServer( + retrieveAllAuditLogsEndpoint(svc), + decodeRetrieveAllAuditLogsReq, + api.EncodeResponse, + opts..., + ), "retrieve_all_audit_logs").ServeHTTP) + }) + + mux.Get("health", supermq.Health("auditlogs", instanceID)) + mux.Handle("/metrics", promhttp.Handler()) + + return mux +} diff --git a/auditlogs/auditlogs.go b/auditlogs/auditlogs.go new file mode 100644 index 0000000000..d921885d69 --- /dev/null +++ b/auditlogs/auditlogs.go @@ -0,0 +1,71 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package auditlogs + +import ( + "context" + "time" + + "github.com/absmach/supermq/pkg/authn" +) + +type AuditLog struct { + ID string `json:"id"` + RequestID string `json:"request_id"` + DomainID string `json:"domain_id"` + OccurredAt time.Time `json:"occured_at"` + ActorID string `json:"actor_id"` + CurrentState EntityState `json:"current_state"` + PreviousState EntityState `json:"previous_state"` + StateAttributes Metadata `json:"state_attributes"` + EntityID string `json:"entity_id"` + EntityType EntityType `json:"entity_type"` + Metadata Metadata `json:"metadata"` +} + +type AuditLogPage struct { + Page + Logs []AuditLog `json:"logs"` +} + +type Page struct { + Total uint64 `json:"total"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` + Order string `json:"order,omitempty"` + Dir string `json:"dir,omitempty"` + ID string `json:"id,omitempty"` + RequestID string `json:"request_id,omitempty"` + OccuredAt string `json:"occured_at,omitempty"` + ActorID string `json:"actor_id,omitempty"` + EntityType string `json:"entity_type,omitempty"` + EntityID string `json:"entity_id,omitempty"` +} + +type Metadata map[string]any + + +//go:generate mockery --name Repository --output=./mocks --filename repository.go --quiet --note "Copyright (c) Abstract Machines" +type Repository interface { + // Save saves audit log. + Save(ctx context.Context, log AuditLog) error + + // RetrieveByID retrieves audit log by its unique ID. + RetrieveByID(ctx context.Context, id string) (AuditLog, error) + + // RetrieveAll retrieves all audit logs. + RetrieveAll(ctx context.Context, pm Page) (AuditLogPage, error) +} + +//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines" +type Service interface { + // Save saves audit log. + Save(ctx context.Context, log AuditLog) error + + // RetrieveByID retrieves audit log by its unique ID. + RetrieveByID(ctx context.Context, session authn.Session, id string) (AuditLog, error) + + // RetrieveAll retrieves all audit logs. + RetrieveAll(ctx context.Context, session authn.Session, pm Page) (AuditLogPage, error) +} diff --git a/auditlogs/events/consumer.go b/auditlogs/events/consumer.go new file mode 100644 index 0000000000..863673bff8 --- /dev/null +++ b/auditlogs/events/consumer.go @@ -0,0 +1,143 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package events + +import ( + "context" + "errors" + "strings" + "time" + + "github.com/absmach/supermq/auditlogs" + "github.com/absmach/supermq/pkg/events" + "github.com/absmach/supermq/pkg/events/store" +) + +var ErrMissingOccurredAt = errors.New("missing occurred_at") + +type handleFunc func(ctx context.Context, event events.Event) error + +func (h handleFunc) Handle(ctx context.Context, event events.Event) error { + return h(ctx, event) +} + +func (h handleFunc) Cancel() error { + return nil +} + +// Start method starts consuming messages received from Event store. +func Start(ctx context.Context, consumer string, sub events.Subscriber, service auditlogs.Service) error { + subCfg := events.SubscriberConfig{ + Consumer: consumer, + Stream: store.StreamAllEvents, + Handler: Handle(service), + } + + return sub.Subscribe(ctx, subCfg) +} + +func Handle(service auditlogs.Service) handleFunc { + return func(ctx context.Context, event events.Event) error { + data, err := event.Encode() + if err != nil { + return err + } + + requestID, ok := data["request_id"].(string) + if !ok { + return errors.New("missing request_id") + } + delete(data, "request_id") + + domainID, ok := data["domain"].(string) + if !ok { + return errors.New("missing domain") + } + delete(data, "domain") + + operation, ok := data["operation"].(string) + if !ok { + return errors.New("missing operation") + } + delete(data, "operation") + + if operation == "" { + return errors.New("missing operation") + } + + entityType, state := toState(operation) + if entityType == auditlogs.MessageEntity { + return nil + } + + entityID, ok := data["id"].(string) + if !ok { + return errors.New("missing entity_id") + } + + occurredAt, ok := data["occurred_at"].(float64) + if !ok { + return ErrMissingOccurredAt + } + delete(data, "occurred_at") + + if occurredAt == 0 { + return ErrMissingOccurredAt + } + + metadata, ok := data["metadata"].(map[string]interface{}) + if !ok { + metadata = make(map[string]interface{}) + } + delete(data, "metadata") + + if len(data) == 0 { + return errors.New("missing attributes") + } + + al := auditlogs.AuditLog{ + RequestID: requestID, + DomainID: domainID, + OccurredAt: time.Unix(0, int64(occurredAt)), + StateAttributes: data, + CurrentState: state, + EntityID: entityID, + EntityType: entityType, + Metadata: metadata, + } + + return service.Save(ctx, al) + } +} + +func toState(operation string) (auditlogs.EntityType, auditlogs.EntityState) { + var entityType auditlogs.EntityType + var entityState auditlogs.EntityState + + op := strings.Split(operation, ".") + + switch op[0] { + case "user": + entityType = auditlogs.UserEntity + case "group": + entityType = auditlogs.GroupEntity + case "client": + entityType = auditlogs.ClientEntity + case "channel": + entityType = auditlogs.ChannelEntity + default: + entityType = auditlogs.MessageEntity + } + + switch op[1] { + case "create": + entityState = auditlogs.CreatedState + case "remove", "delete": + entityState = auditlogs.DeletedState + default: + entityState = auditlogs.UpdatedState + } + + return entityType, entityState +} diff --git a/auditlogs/middleware/authorization.go b/auditlogs/middleware/authorization.go new file mode 100644 index 0000000000..a211d39f8c --- /dev/null +++ b/auditlogs/middleware/authorization.go @@ -0,0 +1,59 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + + "github.com/absmach/supermq/auditlogs" + "github.com/absmach/supermq/pkg/authn" + smqauthz "github.com/absmach/supermq/pkg/authz" + "github.com/absmach/supermq/pkg/policies" +) + +type authorizationMiddleware struct { + svc auditlogs.Service + authz smqauthz.Authorization +} + +func AuthorizationMiddleware(svc auditlogs.Service, authz smqauthz.Authorization) auditlogs.Service { + return &authorizationMiddleware{ + svc: svc, + authz: authz, + } +} + +func (m *authorizationMiddleware) Save(ctx context.Context, log auditlogs.AuditLog) error { + return m.svc.Save(ctx, log) +} + +func (m *authorizationMiddleware) RetrieveByID(ctx context.Context, session authn.Session,id string) (auditlogs.AuditLog, error) { + if err := m.checkSuperAdmin(ctx, session.UserID); err != nil { + return auditlogs.AuditLog{}, err + } + + return m.svc.RetrieveByID(ctx,session, id) +} + +func (m *authorizationMiddleware) RetrieveAll(ctx context.Context, session authn.Session, pm auditlogs.Page) (auditlogs.AuditLogPage, error) { + if err := m.checkSuperAdmin(ctx, session.UserID); err != nil { + return auditlogs.AuditLogPage{}, err + } + + return m.svc.RetrieveAll(ctx, session, pm) +} + + +func (am *authorizationMiddleware) checkSuperAdmin(ctx context.Context, userID string) error { + if err := am.authz.Authorize(ctx, smqauthz.PolicyReq{ + SubjectType: policies.UserType, + Subject: userID, + Permission: policies.AdminPermission, + ObjectType: policies.PlatformType, + Object: policies.SuperMQObject, + }); err != nil { + return err + } + return nil +} diff --git a/auditlogs/middleware/logging.go b/auditlogs/middleware/logging.go new file mode 100644 index 0000000000..dfd48d1ec4 --- /dev/null +++ b/auditlogs/middleware/logging.go @@ -0,0 +1,77 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + "log/slog" + "time" + + "github.com/absmach/supermq/auditlogs" + "github.com/absmach/supermq/pkg/authn" + "github.com/go-chi/chi/v5/middleware" +) + +var _ auditlogs.Service = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger *slog.Logger + svc auditlogs.Service +} + +func LoggingMiddleware(svc auditlogs.Service, logger *slog.Logger) auditlogs.Service { + return &loggingMiddleware{ + logger, + svc, + } +} + +func (lm *loggingMiddleware) Save(ctx context.Context, log auditlogs.AuditLog) (err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + } + if err != nil { + args = append(args, slog.String("error", err.Error())) + lm.logger.Warn("Save audit log failed", args...) + return + } + lm.logger.Info("Save audit log completed successfully", args...) + }(time.Now()) + return lm.svc.Save(ctx, log) +} + +func (lm *loggingMiddleware) RetrieveByID(ctx context.Context, session authn.Session, id string) (log auditlogs.AuditLog, err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("domain_id", session.DomainID), + slog.String("request_id", middleware.GetReqID(ctx)), + } + if err != nil { + args = append(args, slog.String("error", err.Error())) + lm.logger.Warn("Retrieve audit log by ID failed", args...) + return + } + lm.logger.Info("Retrieve audit log by ID completed successfully", args...) + }(time.Now()) + return lm.svc.RetrieveByID(ctx, session, id) +} + +func (lm *loggingMiddleware) RetrieveAll(ctx context.Context, session authn.Session, pm auditlogs.Page) (page auditlogs.AuditLogPage, err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("domain_id", session.DomainID), + slog.String("request_id", middleware.GetReqID(ctx)), + } + if err != nil { + args = append(args, slog.String("error", err.Error())) + lm.logger.Warn("Retrieve all audit logs failed", args...) + return + } + lm.logger.Info("Retrieve all audit logs completed successfully", args...) + }(time.Now()) + return lm.svc.RetrieveAll(ctx, session, pm) +} diff --git a/auditlogs/middleware/metrics.go b/auditlogs/middleware/metrics.go new file mode 100644 index 0000000000..8c87ac5582 --- /dev/null +++ b/auditlogs/middleware/metrics.go @@ -0,0 +1,55 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + "time" + + "github.com/absmach/supermq/auditlogs" + "github.com/absmach/supermq/pkg/authn" + "github.com/go-kit/kit/metrics" +) + + +var _ auditlogs.Service = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + svc auditlogs.Service +} + +// MetricsMiddleware returns a new metrics middleware wrapper. +func MetricsMiddleware(svc auditlogs.Service, counter metrics.Counter, latency metrics.Histogram) auditlogs.Service { + return &metricsMiddleware{ + counter: counter, + latency: latency, + svc: svc, + } +} + +func (ms *metricsMiddleware) Save(ctx context.Context, log auditlogs.AuditLog) error { + defer func(begin time.Time) { + ms.counter.With("method", "Save").Add(1) + ms.latency.With("method", "Save").Observe(time.Since(begin).Seconds()) + }(time.Now()) + return ms.svc.Save(ctx, log) +} + +func (ms *metricsMiddleware) RetrieveByID(ctx context.Context, session authn.Session, id string) (auditlogs.AuditLog, error) { + defer func(begin time.Time) { + ms.counter.With("method", "RetrieveByID").Add(1) + ms.latency.With("method", "RetrieveByID").Observe(time.Since(begin).Seconds()) + }(time.Now()) + return ms.svc.RetrieveByID(ctx, session, id) +} + +func (ms *metricsMiddleware) RetrieveAll(ctx context.Context, session authn.Session, pm auditlogs.Page) (auditlogs.AuditLogPage, error) { + defer func(begin time.Time) { + ms.counter.With("method", "RetrieveAll").Add(1) + ms.latency.With("method", "RetrieveAll").Observe(time.Since(begin).Seconds()) + }(time.Now()) + return ms.svc.RetrieveAll(ctx, session, pm) +} diff --git a/auditlogs/mocks/repository.go b/auditlogs/mocks/repository.go new file mode 100644 index 0000000000..da5cab3599 --- /dev/null +++ b/auditlogs/mocks/repository.go @@ -0,0 +1,106 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + context "context" + + auditlogs "github.com/absmach/supermq/auditlogs" + + mock "github.com/stretchr/testify/mock" +) + +// Repository is an autogenerated mock type for the Repository type +type Repository struct { + mock.Mock +} + +// RetrieveAll provides a mock function with given fields: ctx, pm +func (_m *Repository) RetrieveAll(ctx context.Context, pm auditlogs.Page) (auditlogs.AuditLogPage, error) { + ret := _m.Called(ctx, pm) + + if len(ret) == 0 { + panic("no return value specified for RetrieveAll") + } + + var r0 auditlogs.AuditLogPage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, auditlogs.Page) (auditlogs.AuditLogPage, error)); ok { + return rf(ctx, pm) + } + if rf, ok := ret.Get(0).(func(context.Context, auditlogs.Page) auditlogs.AuditLogPage); ok { + r0 = rf(ctx, pm) + } else { + r0 = ret.Get(0).(auditlogs.AuditLogPage) + } + + if rf, ok := ret.Get(1).(func(context.Context, auditlogs.Page) error); ok { + r1 = rf(ctx, pm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RetrieveByID provides a mock function with given fields: ctx, id +func (_m *Repository) RetrieveByID(ctx context.Context, id string) (auditlogs.AuditLog, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for RetrieveByID") + } + + var r0 auditlogs.AuditLog + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (auditlogs.AuditLog, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) auditlogs.AuditLog); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Get(0).(auditlogs.AuditLog) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Save provides a mock function with given fields: ctx, log +func (_m *Repository) Save(ctx context.Context, log auditlogs.AuditLog) error { + ret := _m.Called(ctx, log) + + if len(ret) == 0 { + panic("no return value specified for Save") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, auditlogs.AuditLog) error); ok { + r0 = rf(ctx, log) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewRepository creates a new instance of Repository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *Repository { + mock := &Repository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/auditlogs/mocks/service.go b/auditlogs/mocks/service.go new file mode 100644 index 0000000000..0f3f214d25 --- /dev/null +++ b/auditlogs/mocks/service.go @@ -0,0 +1,107 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + auditlogs "github.com/absmach/supermq/auditlogs" + authn "github.com/absmach/supermq/pkg/authn" + + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// Service is an autogenerated mock type for the Service type +type Service struct { + mock.Mock +} + +// RetrieveAll provides a mock function with given fields: ctx, session, pm +func (_m *Service) RetrieveAll(ctx context.Context, session authn.Session, pm auditlogs.Page) (auditlogs.AuditLogPage, error) { + ret := _m.Called(ctx, session, pm) + + if len(ret) == 0 { + panic("no return value specified for RetrieveAll") + } + + var r0 auditlogs.AuditLogPage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, auditlogs.Page) (auditlogs.AuditLogPage, error)); ok { + return rf(ctx, session, pm) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, auditlogs.Page) auditlogs.AuditLogPage); ok { + r0 = rf(ctx, session, pm) + } else { + r0 = ret.Get(0).(auditlogs.AuditLogPage) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, auditlogs.Page) error); ok { + r1 = rf(ctx, session, pm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RetrieveByID provides a mock function with given fields: ctx, session, id +func (_m *Service) RetrieveByID(ctx context.Context, session authn.Session, id string) (auditlogs.AuditLog, error) { + ret := _m.Called(ctx, session, id) + + if len(ret) == 0 { + panic("no return value specified for RetrieveByID") + } + + var r0 auditlogs.AuditLog + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (auditlogs.AuditLog, error)); ok { + return rf(ctx, session, id) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) auditlogs.AuditLog); ok { + r0 = rf(ctx, session, id) + } else { + r0 = ret.Get(0).(auditlogs.AuditLog) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok { + r1 = rf(ctx, session, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Save provides a mock function with given fields: ctx, log +func (_m *Service) Save(ctx context.Context, log auditlogs.AuditLog) error { + ret := _m.Called(ctx, log) + + if len(ret) == 0 { + panic("no return value specified for Save") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, auditlogs.AuditLog) error); ok { + r0 = rf(ctx, log) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewService creates a new instance of Service. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewService(t interface { + mock.TestingT + Cleanup(func()) +}) *Service { + mock := &Service{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/auditlogs/postgres/auditlogs.go b/auditlogs/postgres/auditlogs.go new file mode 100644 index 0000000000..5fb702143f --- /dev/null +++ b/auditlogs/postgres/auditlogs.go @@ -0,0 +1,277 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/absmach/supermq/auditlogs" + "github.com/absmach/supermq/pkg/errors" + repoerr "github.com/absmach/supermq/pkg/errors/repository" + "github.com/absmach/supermq/pkg/postgres" + "github.com/jmoiron/sqlx" +) + +type auditLogsRepo struct { + db postgres.Database +} + +func NewRepository(db postgres.Database) auditlogs.Repository { + return &auditLogsRepo{ + db: db, + } +} + +func (ar *auditLogsRepo) Save(ctx context.Context, l auditlogs.AuditLog) error { + q := `INSERT INTO audit_logs (id, request_id, domain_id, occurred_at, actor_id, current_state, previous_state, state_attributes, entity_id, entity_type, metadata) + VALUES (:id, :request_id, :domain_id, :occurred_at, :actor_id, :current_state, :previous_state, :state_attributes, :entity_id, :entity_type, :metadata)` + + dbal, err := toDBAuditLog(l) + if err != nil { + return errors.Wrap(repoerr.ErrCreateEntity, err) + } + if _, err := ar.db.NamedQueryContext(ctx, q, dbal); err != nil { + return errors.Wrap(repoerr.ErrCreateEntity, err) + } + + return nil +} + +func (ar *auditLogsRepo) RetrieveByID(ctx context.Context, id string) (auditlogs.AuditLog, error) { + q := `SELECT id, request_id, domain_id, occurred_at, actor_id, current_state, previous_state, state_attributes, entity_id, entity_type, metadata + FROM audit logs WHERE id = :id` + + dbal := dbAuditLog{ + ID: id, + } + + rows, err := ar.db.NamedQueryContext(ctx, q, dbal) + if err != nil { + return auditlogs.AuditLog{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + defer rows.Close() + + dbal = dbAuditLog{} + if rows.Next() { + if err := rows.StructScan(&dbal); err != nil { + return auditlogs.AuditLog{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + log, err := toAuditLog(dbal) + if err != nil { + return auditlogs.AuditLog{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + return log, nil + } + + return auditlogs.AuditLog{}, repoerr.ErrNotFound +} + +func (ar *auditLogsRepo) RetrieveAll(ctx context.Context, pm auditlogs.Page) (auditlogs.AuditLogPage, error) { + query, err := buildPageQuery(pm) + if err != nil { + return auditlogs.AuditLogPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + q := fmt.Sprintf(`SELECT l.id as id, l.request_id as request_id, l.domain_id as domain_id, l.occurred_at as occurred_at, l.actor_id as actor_id, l.current_state as current_state, l.previous_state as previous_state, l.state_attributes as state_attributes, l.entity_id as entity_id, l.entity_type as entity_type, l.metadata as metadata + FROM audit_logs l %s ORDER BY l.occurred_at %s OFFSET :offset LIMIT :limit`, query, pm.Order) + + dbpm, err := toDBAuditLogsPage(pm) + if err != nil { + return auditlogs.AuditLogPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + rows, err := ar.db.NamedQueryContext(ctx, q, dbpm) + if err != nil { + return auditlogs.AuditLogPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + defer rows.Close() + + items, err := ar.processRows(rows) + if err != nil { + return auditlogs.AuditLogPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + cq := fmt.Sprintf(`SELECT COUNT(*) as total_count + FROM ( + SELECT DISTINCT l.id, l.request_id, l.domain_id, l.occurred_at, l.actor_id, l.current_state, l.previous_state, l.state_attributes, l.entity_id, l.entity_type, l.metadata + FROM audit_logs l %s) as subquery`, query) + + total, err := postgres.Total(ctx, ar.db, cq, dbpm) + if err != nil { + return auditlogs.AuditLogPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + page := auditlogs.AuditLogPage{ + Page: pm, + } + page.Total = total + page.Logs = items + + return page, nil +} + +type dbAuditLog struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + DomainID string `db:"domain_id"` + OccurredAt time.Time `db:"occurred_at"` + ActorID string `db:"actor_id"` + CurrentState string `db:"current_state"` + PreviousState string `db:"previous_state"` + StateAttributes []byte `db:"state_attributes"` + EntityID string `db:"entity_id"` + EntityType string `db:"entity_type"` + Metadata []byte `db:"metadata"` +} + +func toDBAuditLog(log auditlogs.AuditLog) (dbAuditLog, error) { + stateAttr := []byte("{}") + if len(log.StateAttributes) > 0 { + b, err := json.Marshal(log.StateAttributes) + if err != nil { + return dbAuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + stateAttr = b + } + metadata := []byte("{}") + if len(log.Metadata) > 0 { + b, err := json.Marshal(log.Metadata) + if err != nil { + return dbAuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + metadata = b + } + return dbAuditLog{ + ID: log.ID, + RequestID: log.RequestID, + DomainID: log.DomainID, + OccurredAt: log.OccurredAt, + ActorID: log.ActorID, + CurrentState: log.CurrentState.String(), + PreviousState: log.PreviousState.String(), + StateAttributes: stateAttr, + EntityID: log.EntityID, + EntityType: log.EntityType.String(), + Metadata: metadata, + }, nil +} + +func toAuditLog(log dbAuditLog) (auditlogs.AuditLog, error) { + var stateAttr auditlogs.Metadata + if log.StateAttributes != nil { + if err := json.Unmarshal(log.StateAttributes, &stateAttr); err != nil { + return auditlogs.AuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + } + var metadata auditlogs.Metadata + if log.Metadata != nil { + if err := json.Unmarshal(log.Metadata, &metadata); err != nil { + return auditlogs.AuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + + } + cs, err := auditlogs.ToEntityState(log.CurrentState) + if err != nil { + return auditlogs.AuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + ps, err := auditlogs.ToEntityState(log.PreviousState) + if err != nil { + return auditlogs.AuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + et, err := auditlogs.ToEntityType(log.EntityType) + if err != nil { + return auditlogs.AuditLog{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + return auditlogs.AuditLog{ + ID: log.ID, + RequestID: log.RequestID, + DomainID: log.DomainID, + OccurredAt: log.OccurredAt, + ActorID: log.ActorID, + CurrentState: cs, + PreviousState: ps, + StateAttributes: stateAttr, + EntityID: log.EntityID, + EntityType: et, + Metadata: metadata, + }, nil +} + +func buildPageQuery(pm auditlogs.Page) (string, error) { + var query []string + var emq string + + if pm.ID != "" { + query = append(query, "l.id = :id") + } + if pm.RequestID != "" { + query = append(query, "l.request_id = :request_id") + } + if pm.ActorID != "" { + query = append(query, "l.actor_id = :actor_id") + } + if pm.EntityType != "" { + query = append(query, "l.entity_type = :entity_type") + } + if pm.EntityID != "" { + query = append(query, "l.entity_id = :entity_id") + } + + if len(query) > 0 { + emq = fmt.Sprintf("WHERE %s", strings.Join(query, " AND ")) + } + + return emq, nil +} + +type dbAuditLogsPage struct { + Total uint64 `db:"total"` + Limit uint64 `db:"limit"` + Offset uint64 `db:"offset"` + Order string `db:"order"` + Dir string `db:"dir"` + ID string `db:"id"` + RequestID string `db:"request_id"` + OccuredAt string `db:"occurred_at"` + ActorID string `db:"actor_id"` + EntityType string `db:"entity_type"` + EntityID string `db:"entity_id"` +} + +func toDBAuditLogsPage(pm auditlogs.Page) (dbAuditLogsPage, error) { + return dbAuditLogsPage{ + Total: pm.Total, + Limit: pm.Limit, + Offset: pm.Offset, + Order: pm.Order, + Dir: pm.Dir, + ID: pm.ID, + RequestID: pm.RequestID, + OccuredAt: pm.OccuredAt, + ActorID: pm.ActorID, + EntityType: pm.EntityType, + EntityID: pm.EntityID, + }, nil +} + +func (ar *auditLogsRepo) processRows(rows *sqlx.Rows) ([]auditlogs.AuditLog, error) { + var items []auditlogs.AuditLog + for rows.Next() { + dbal := dbAuditLog{} + if err := rows.StructScan(&dbal); err != nil { + return items, err + } + al, err := toAuditLog(dbal) + if err != nil { + return items, err + } + items = append(items, al) + } + return items, nil +} diff --git a/auditlogs/postgres/doc.go b/auditlogs/postgres/doc.go new file mode 100644 index 0000000000..6c82bc2872 --- /dev/null +++ b/auditlogs/postgres/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package postgres contains the database implementation of audit logs repository layer. +package postgres \ No newline at end of file diff --git a/auditlogs/postgres/init.go b/auditlogs/postgres/init.go new file mode 100644 index 0000000000..60b3e21e36 --- /dev/null +++ b/auditlogs/postgres/init.go @@ -0,0 +1,39 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access + migrate "github.com/rubenv/sql-migrate" +) + +// Migration of Audit Logs service. +func Migration() *migrate.MemoryMigrationSource { + return &migrate.MemoryMigrationSource{ + Migrations: []*migrate.Migration{ + { + Id: "auditlogs_01", + + Up: []string{ + `CREATE TABLE IF NOT EXISTS audit_logs ( + id VARCHAR(36) PRIMARY KEY, + request_id VARCHAR(36), + domain_id VARCHAR(36), + occured_at TIMESTAMP, + actor_id VARCHAR(36), + current_state JSONB, + previous_state JSONB, + state_attributes JSONB, + entity_id VARCHAR(36), + entity_type VARCHAR(36), + metadata JSONB + )`, + }, + Down: []string{ + `DROP TABLE IF EXISTS audit_logs`, + }, + }, + }, + } +} diff --git a/auditlogs/service.go b/auditlogs/service.go new file mode 100644 index 0000000000..2dde6a0282 --- /dev/null +++ b/auditlogs/service.go @@ -0,0 +1,57 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package auditlogs + +import ( + "context" + + "github.com/absmach/supermq" + "github.com/absmach/supermq/pkg/authn" + "github.com/absmach/supermq/pkg/errors" + svcerr "github.com/absmach/supermq/pkg/errors/service" +) + +type service struct { + idProvider supermq.IDProvider + repository Repository +} + +func NewService(idp supermq.IDProvider, repository Repository) Service { + return &service{ + idProvider: idp, + repository: repository, + } +} + +func (svc *service) Save(ctx context.Context, log AuditLog) error { + id, err := svc.idProvider.ID() + if err != nil { + return err + } + log.ID = id + + if err := svc.repository.Save(ctx, log); err != nil { + return errors.Wrap(svcerr.ErrCreateEntity, err) + } + + return nil +} + +func (svc *service) RetrieveByID(ctx context.Context, session authn.Session, id string) (AuditLog, error) { + log, err := svc.repository.RetrieveByID(ctx, id) + if err != nil { + return AuditLog{}, errors.Wrap(svcerr.ErrViewEntity, err) + } + + return log, nil +} + +func (svc *service) RetrieveAll(ctx context.Context, session authn.Session, pm Page) (AuditLogPage, error) { + page, err := svc.repository.RetrieveAll(ctx, pm) + if err != nil { + return AuditLogPage{}, errors.Wrap(svcerr.ErrViewEntity, err) + } + + return page, nil +} diff --git a/auditlogs/state.go b/auditlogs/state.go new file mode 100644 index 0000000000..eeedf41fab --- /dev/null +++ b/auditlogs/state.go @@ -0,0 +1,112 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package auditlogs + +import ( + apiutil "github.com/absmach/supermq/api/http/util" + "github.com/absmach/supermq/pkg/errors" +) + +type EntityType uint8 + +const ( + UserEntity EntityType = iota + GroupEntity + ClientEntity + ChannelEntity + MessageEntity +) + +// String representation of the possible entity type values. +const ( + userEntityType = "user" + groupEntityType = "group" + clientEntityType = "client" + channelEntityType = "channel" + messageEntityType = "message" +) + +// String converts entity type to string literal. +func (e EntityType) String() string { + switch e { + case UserEntity: + return userEntityType + case GroupEntity: + return groupEntityType + case ClientEntity: + return clientEntityType + case ChannelEntity: + return channelEntityType + case MessageEntity: + return messageEntityType + default: + return "" + } +} + +// ToEntityType converts string value to a valid entity type. +func ToEntityType(entityType string) (EntityType, error) { + switch entityType { + case userEntityType: + return UserEntity, nil + case groupEntityType: + return GroupEntity, nil + case clientEntityType: + return ClientEntity, nil + case channelEntityType: + return ChannelEntity, nil + case messageEntityType: + return MessageEntity, nil + default: + return EntityType(0), apiutil.ErrInvalidEntityType + } +} + +type EntityState uint8 + +const ( + CreatedState EntityState = iota + UpdatedState + EnabledState + DisabledState + DeletedState +) + +const ( + createdState = "created" + updatedState = "updated" + enabledState = "enabled" + disabledState = "disabled" + deletedState = "deleted" +) + +func (s EntityState) String() string { + switch s { + case CreatedState: + return createdState + case UpdatedState: + return updatedState + case EnabledState: + return enabledState + case DisabledState: + return disabledState + case DeletedState: + return deletedState + default: + return "" + } +} + +func ToEntityState(state string) (EntityState, error) { + switch state { + case createdState: + return CreatedState, nil + case updatedState: + return UpdatedState, nil + case deletedState: + return DeletedState, nil + default: + return EntityState(0), errors.ErrMalformedEntity + } +} diff --git a/cmd/auditlogs/main.go b/cmd/auditlogs/main.go new file mode 100644 index 0000000000..f674dc766e --- /dev/null +++ b/cmd/auditlogs/main.go @@ -0,0 +1,210 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "fmt" + "log" + "log/slog" + "net/url" + "os" + + chclient "github.com/absmach/callhome/pkg/client" + "github.com/absmach/supermq" + "github.com/absmach/supermq/auditlogs" + httpapi "github.com/absmach/supermq/auditlogs/api" + "github.com/absmach/supermq/auditlogs/events" + "github.com/absmach/supermq/auditlogs/middleware" + auditlogspg "github.com/absmach/supermq/auditlogs/postgres" + smqlog "github.com/absmach/supermq/logger" + authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc" + smqauthz "github.com/absmach/supermq/pkg/authz" + authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" + domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" + "github.com/absmach/supermq/pkg/events/store" + "github.com/absmach/supermq/pkg/grpcclient" + jaegerclient "github.com/absmach/supermq/pkg/jaeger" + "github.com/absmach/supermq/pkg/postgres" + pgclient "github.com/absmach/supermq/pkg/postgres" + "github.com/absmach/supermq/pkg/prometheus" + "github.com/absmach/supermq/pkg/server" + "github.com/absmach/supermq/pkg/server/http" + "github.com/absmach/supermq/pkg/uuid" + "github.com/caarlos0/env/v11" + "github.com/jmoiron/sqlx" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" + "github.com/go-chi/chi/v5" +) + +const ( + svcName = "auditlogs" + envPrefixDB = "SMQ_AUDIT_LOGS_DB_" + envPrefixHTTP = "SMQ_AUDIT_LOGS_HTTP_" + envPrefixAuth = "SMQ_AUTH_GRPC_" + envPrefixDomains = "SMQ_DOMAINS_GRPC_" + defDB = "auditlogs" + defSvcHTTPPort = "9022" +) + +type config struct { + LogLevel string `env:"SMQ_AUDIT_LOGS_LOG_LEVEL" envDefault:"info"` + ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` + InstanceID string `env:"SMQ_AUDIT_LOGS_INSTANCE_ID" envDefault:""` + TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load %s configuration : %s", svcName, err) + } + + logger, err := smqlog.New(os.Stdout, cfg.LogLevel) + if err != nil { + log.Fatalf("failed to init logger: %s", err) + } + + var exitCode int + defer smqlog.ExitWithError(&exitCode) + + if cfg.InstanceID == "" { + if cfg.InstanceID, err = uuid.New().ID(); err != nil { + logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err)) + exitCode = 1 + return + } + } + + dbConfig := pgclient.Config{Name: defDB} + if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + db, err := pgclient.Setup(dbConfig, *auditlogspg.Migration()) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer db.Close() + + authClientCfg := grpcclient.Config{} + if err := env.ParseWithOptions(&authClientCfg, env.Options{Prefix: envPrefixAuth}); err != nil { + logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err)) + exitCode = 1 + return + } + + authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, authClientCfg) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer authnHandler.Close() + logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure()) + + domsGrpcCfg := grpcclient.Config{} + if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil { + logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err)) + exitCode = 1 + return + } + domAuthz, _, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer domainsHandler.Close() + + authz, authzHandler, err := authsvcAuthz.NewAuthorization(ctx, authClientCfg, domAuthz) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer authzHandler.Close() + logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure()) + + tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio) + if err != nil { + logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err)) + exitCode = 1 + return + } + defer func() { + if err := tp.Shutdown(ctx); err != nil { + logger.Error(fmt.Sprintf("error shutting down tracer provider: %s", err)) + } + }() + tracer := tp.Tracer(svcName) + + svc := newService(db, dbConfig, authz, logger, tracer) + + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to create subscriber: %s", err)) + exitCode = 1 + return + } + + logger.Info("Subscribed to Event Store") + + if err := events.Start(ctx, svcName, subscriber, svc); err != nil { + logger.Error("failed to start %s service: %s", svcName, err) + exitCode = 1 + return + } + + httpServerConfig := server.Config{Port: defSvcHTTPPort} + if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error())) + exitCode = 1 + return + } + + mux := chi.NewRouter() + idp := uuid.New() + hs := http.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn,mux, logger, cfg.InstanceID, idp), logger) + + if cfg.SendTelemetry { + chc := chclient.New(svcName, supermq.Version, logger, cancel) + go chc.CallHome(ctx) + } + + g.Go(func() error { + return hs.Start() + }) + + g.Go(func() error { + return server.StopSignalHandler(ctx, cancel, logger, svcName, hs) + }) + + if err := g.Wait(); err != nil { + logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err)) + } +} + +func newService(db *sqlx.DB, dbConfig pgclient.Config, authz smqauthz.Authorization, logger *slog.Logger, tracer trace.Tracer) auditlogs.Service { + database := postgres.NewDatabase(db, dbConfig, tracer) + repo := auditlogspg.NewRepository(database) + idp := uuid.New() + + svc := auditlogs.NewService(idp, repo) + svc = middleware.AuthorizationMiddleware(svc, authz) + svc = middleware.LoggingMiddleware(svc, logger) + counter, latency := prometheus.MakeMetrics("journal", "journal_writer") + svc = middleware.MetricsMiddleware(svc, counter, latency) + + return svc +} diff --git a/docker/.env b/docker/.env index f88856e391..aca243c4da 100644 --- a/docker/.env +++ b/docker/.env @@ -370,6 +370,23 @@ SMQ_WS_ADAPTER_HTTP_SERVER_CERT= SMQ_WS_ADAPTER_HTTP_SERVER_KEY= SMQ_WS_ADAPTER_INSTANCE_ID= +### Audit logs +SMQ_AUDIT_LOGS_LOG_LEVEL=info +SMQ_AUDIT_LOGS_HTTP_HOST=journal +SMQ_AUDIT_LOGS_HTTP_PORT=9021 +SMQ_AUDIT_LOGS_HTTP_SERVER_CERT= +SMQ_AUDIT_LOGS_HTTP_SERVER_KEY= +SMQ_AUDIT_LOGS_DB_HOST=journal-db +SMQ_AUDIT_LOGS_DB_PORT=5432 +SMQ_AUDIT_LOGS_DB_USER=supermq +SMQ_AUDIT_LOGS_DB_PASS=supermq +SMQ_AUDIT_LOGS_DB_NAME=journal +SMQ_AUDIT_LOGS_DB_SSL_MODE=disable +SMQ_AUDIT_LOGS_DB_SSL_CERT= +SMQ_AUDIT_LOGS_DB_SSL_KEY= +SMQ_AUDIT_LOGS_DB_SSL_ROOT_CERT= +SMQ_AUDIT_LOGS_INSTANCE_ID= + ## Addons Services ### Vault SMQ_VAULT_HOST=vault diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c56bdb27be..af4ecf94ca 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1243,6 +1243,62 @@ services: target: /auth-grpc-server-ca${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+.crt} bind: create_host_path: true + + audit-logs-db: + image: postgres:16.2-alpine + container_name: supermq-audit-logs-db + restart: on-failure + command: postgres -c "max_connections=${SMQ_POSTGRES_MAX_CONNECTIONS}" + environment: + POSTGRES_USER: ${SMQ_AUDIT_LOGS_DB_USER} + POSTGRES_PASSWORD: ${SMQ_AUDIT_LOGS_DB_PASS} + POSTGRES_DB: ${SMQ_AUDIT_LOGS_DB_NAME} + SMQ_POSTGRES_MAX_CONNECTIONS: ${SMQ_POSTGRES_MAX_CONNECTIONS} + networks: + - supermq-base-net + volumes: + - supermq-audit-logs-volume:/var/lib/postgresql/data + + audit-logs: + image: supermq/audit-logs:${SMQ_RELEASE_TAG} + container_name: supermq-audit-logs + depends_on: + - audit-logs-db + restart: on-failure + environment: + SMQ_AUDIT_LOGS_LOG_LEVEL: ${SMQ_AUDIT_LOGS_LOG_LEVEL} + SMQ_AUDIT_LOGS_HTTP_HOST: ${SMQ_AUDIT_LOGS_HTTP_HOST} + SMQ_AUDIT_LOGS_HTTP_PORT: ${SMQ_AUDIT_LOGS_HTTP_PORT} + SMQ_AUDIT_LOGS_HTTP_SERVER_CERT: ${SMQ_AUDIT_LOGS_HTTP_SERVER_CERT} + SMQ_AUDIT_LOGS_HTTP_SERVER_KEY: ${SMQ_AUDIT_LOGS_HTTP_SERVER_KEY} + SMQ_AUDIT_LOGS_DB_HOST: ${SMQ_AUDIT_LOGS_DB_HOST} + SMQ_AUDIT_LOGS_DB_PORT: ${SMQ_AUDIT_LOGS_DB_PORT} + SMQ_AUDIT_LOGS_DB_USER: ${SMQ_AUDIT_LOGS_DB_USER} + SMQ_AUDIT_LOGS_DB_PASS: ${SMQ_AUDIT_LOGS_DB_PASS} + SMQ_AUDIT_LOGS_DB_NAME: ${SMQ_AUDIT_LOGS_DB_NAME} + SMQ_AUDIT_LOGS_DB_SSL_MODE: ${SMQ_AUDIT_LOGS_DB_SSL_MODE} + SMQ_AUDIT_LOGS_DB_SSL_CERT: ${SMQ_AUDIT_LOGS_DB_SSL_CERT} + SMQ_AUDIT_LOGS_DB_SSL_KEY: ${SMQ_AUDIT_LOGS_DB_SSL_KEY} + SMQ_AUDIT_LOGS_DB_SSL_ROOT_CERT: ${SMQ_AUDIT_LOGS_DB_SSL_ROOT_CERT} + SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL} + SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT} + SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt} + SMQ_AUTH_GRPC_CLIENT_KEY: ${SMQ_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key} + SMQ_AUTH_GRPC_SERVER_CA_CERTS: ${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt} + SMQ_ES_URL: ${SMQ_ES_URL} + SMQ_JAEGER_URL: ${SMQ_JAEGER_URL} + SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO} + SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY} + SMQ_AUDIT_LOGS_INSTANCE_ID: ${SMQ_AUDIT_LOGS_INSTANCE_ID} + SMQ_DOMAINS_GRPC_URL: ${SMQ_DOMAINS_GRPC_URL} + SMQ_DOMAINS_GRPC_TIMEOUT: ${SMQ_DOMAINS_GRPC_TIMEOUT} + SMQ_DOMAINS_GRPC_CLIENT_CERT: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:+/domains-grpc-client.crt} + SMQ_DOMAINS_GRPC_CLIENT_KEY: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:+/domains-grpc-client.key} + SMQ_DOMAINS_GRPC_SERVER_CA_CERTS: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+/domains-grpc-server-ca.crt} + ports: + - ${SMQ_AUDIT_LOGS_HTTP_PORT}:${SMQ_AUDIT_LOGS_HTTP_PORT} + networks: + - supermq-base-net rabbitmq: image: rabbitmq:4.0.5-management-alpine