Skip to content

Commit cd6f385

Browse files
authored
Merge pull request #910 from gobitfly/BEDS-473/Notifications_set_settings
Beds 473/notifications set settings
2 parents 926f583 + 0f0c891 commit cd6f385

File tree

1 file changed

+255
-5
lines changed

1 file changed

+255
-5
lines changed

backend/pkg/api/data_access/notifications.go

Lines changed: 255 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/doug-martin/goqu/v9"
1011
"github.com/ethereum/go-ethereum/params"
1112
"github.com/gobitfly/beaconchain/pkg/api/enums"
1213
t "github.com/gobitfly/beaconchain/pkg/api/types"
1314
"github.com/gobitfly/beaconchain/pkg/commons/db"
1415
"github.com/gobitfly/beaconchain/pkg/commons/types"
16+
"github.com/gobitfly/beaconchain/pkg/commons/utils"
1517
"github.com/shopspring/decimal"
1618
"golang.org/x/sync/errgroup"
1719
)
@@ -296,19 +298,258 @@ func (d *DataAccessService) GetNotificationSettings(ctx context.Context, userId
296298
return result, nil
297299
}
298300
func (d *DataAccessService) UpdateNotificationSettingsGeneral(ctx context.Context, userId uint64, settings t.NotificationSettingsGeneral) error {
299-
return d.dummy.UpdateNotificationSettingsGeneral(ctx, userId, settings)
301+
epoch := utils.TimeToEpoch(time.Now())
302+
303+
var eventsToInsert []goqu.Record
304+
var eventsToDelete []goqu.Expression
305+
306+
tx, err := d.userWriter.BeginTxx(ctx, nil)
307+
if err != nil {
308+
return fmt.Errorf("error starting db transactions to update general notification settings: %w", err)
309+
}
310+
defer utils.Rollback(tx)
311+
312+
// -------------------------------------
313+
// Set the "do not disturb" setting
314+
_, err = tx.ExecContext(ctx, `
315+
UPDATE users
316+
SET notifications_do_not_disturb_ts =
317+
CASE
318+
WHEN $1 = 0 THEN NULL
319+
ELSE TO_TIMESTAMP($1)
320+
END
321+
WHERE id = $2`, settings.DoNotDisturbTimestamp, userId)
322+
if err != nil {
323+
return err
324+
}
325+
326+
// -------------------------------------
327+
// Set the notification channels
328+
_, err = tx.ExecContext(ctx, `
329+
INSERT INTO users_notification_channels (user_id, channel, active)
330+
VALUES ($1, $2, $3), ($1, $4, $5)
331+
ON CONFLICT (user_id, channel)
332+
DO UPDATE SET active = EXCLUDED.active`,
333+
userId, types.EmailNotificationChannel, settings.IsEmailNotificationsEnabled, types.PushNotificationChannel, settings.IsPushNotificationsEnabled)
334+
if err != nil {
335+
return err
336+
}
337+
338+
// -------------------------------------
339+
// Collect the machine and rocketpool events to set and delete
340+
341+
//Machine events
342+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsMachineOfflineSubscribed, userId, string(types.MonitoringMachineOfflineEventName), "", epoch, 0)
343+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsMachineStorageUsageSubscribed, userId, string(types.MonitoringMachineDiskAlmostFullEventName), "", epoch, settings.MachineStorageUsageThreshold)
344+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsMachineCpuUsageSubscribed, userId, string(types.MonitoringMachineCpuLoadEventName), "", epoch, settings.MachineCpuUsageThreshold)
345+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsMachineMemoryUsageSubscribed, userId, string(types.MonitoringMachineMemoryUsageEventName), "", epoch, settings.MachineMemoryUsageThreshold)
346+
347+
// Insert all the events or update the threshold if they already exist
348+
if len(eventsToInsert) > 0 {
349+
insertDs := goqu.Dialect("postgres").
350+
Insert("users_subscriptions").
351+
Cols("user_id", "event_name", "event_filter", "created_ts", "created_epoch", "event_threshold").
352+
Rows(eventsToInsert).
353+
OnConflict(goqu.DoUpdate(
354+
"user_id, event_name, event_filter",
355+
goqu.Record{"event_threshold": goqu.L("EXCLUDED.event_threshold")},
356+
))
357+
358+
query, args, err := insertDs.Prepared(true).ToSQL()
359+
if err != nil {
360+
return fmt.Errorf("error preparing query: %v", err)
361+
}
362+
363+
_, err = tx.ExecContext(ctx, query, args...)
364+
if err != nil {
365+
return err
366+
}
367+
}
368+
369+
// Delete all the events
370+
if len(eventsToDelete) > 0 {
371+
deleteDs := goqu.Dialect("postgres").
372+
Delete("users_subscriptions").
373+
Where(goqu.Or(eventsToDelete...))
374+
375+
query, args, err := deleteDs.Prepared(true).ToSQL()
376+
if err != nil {
377+
return fmt.Errorf("error preparing query: %v", err)
378+
}
379+
380+
_, err = tx.ExecContext(ctx, query, args...)
381+
if err != nil {
382+
return err
383+
}
384+
}
385+
386+
err = tx.Commit()
387+
if err != nil {
388+
return fmt.Errorf("error committing tx to update general notification settings: %w", err)
389+
}
390+
return nil
300391
}
301392
func (d *DataAccessService) UpdateNotificationSettingsNetworks(ctx context.Context, userId uint64, chainId uint64, settings t.NotificationSettingsNetwork) error {
302-
return d.dummy.UpdateNotificationSettingsNetworks(ctx, userId, chainId, settings)
393+
epoch := utils.TimeToEpoch(time.Now())
394+
395+
networks, err := d.GetAllNetworks()
396+
if err != nil {
397+
return err
398+
}
399+
400+
networkName := ""
401+
for _, network := range networks {
402+
if network.ChainId == chainId {
403+
networkName = network.Name
404+
break
405+
}
406+
}
407+
if networkName == "" {
408+
return fmt.Errorf("network with chain id %d to update general notification settings not found", chainId)
409+
}
410+
411+
var eventsToInsert []goqu.Record
412+
var eventsToDelete []goqu.Expression
413+
414+
tx, err := d.userWriter.BeginTxx(ctx, nil)
415+
if err != nil {
416+
return fmt.Errorf("error starting db transactions to update general notification settings: %w", err)
417+
}
418+
defer utils.Rollback(tx)
419+
420+
eventName := fmt.Sprintf("%s:%s", networkName, types.NetworkGasAboveThresholdEventName)
421+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsGasAboveSubscribed, userId, eventName, "", epoch, settings.GasAboveThreshold.Div(decimal.NewFromInt(params.GWei)).InexactFloat64())
422+
eventName = fmt.Sprintf("%s:%s", networkName, types.NetworkGasBelowThresholdEventName)
423+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsGasBelowSubscribed, userId, eventName, "", epoch, settings.GasBelowThreshold.Div(decimal.NewFromInt(params.GWei)).InexactFloat64())
424+
eventName = fmt.Sprintf("%s:%s", networkName, types.NetworkParticipationRateThresholdEventName)
425+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsParticipationRateSubscribed, userId, eventName, "", epoch, settings.ParticipationRateThreshold)
426+
eventName = fmt.Sprintf("%s:%s", networkName, types.RocketpoolNewClaimRoundStartedEventName)
427+
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsNewRewardRoundSubscribed, userId, eventName, "", epoch, 0)
428+
429+
// Insert all the events or update the threshold if they already exist
430+
if len(eventsToInsert) > 0 {
431+
insertDs := goqu.Dialect("postgres").
432+
Insert("users_subscriptions").
433+
Cols("user_id", "event_name", "event_filter", "created_ts", "created_epoch", "event_threshold").
434+
Rows(eventsToInsert).
435+
OnConflict(goqu.DoUpdate(
436+
"user_id, event_name, event_filter",
437+
goqu.Record{"event_threshold": goqu.L("EXCLUDED.event_threshold")},
438+
))
439+
440+
query, args, err := insertDs.Prepared(true).ToSQL()
441+
if err != nil {
442+
return fmt.Errorf("error preparing query: %v", err)
443+
}
444+
445+
_, err = tx.ExecContext(ctx, query, args...)
446+
if err != nil {
447+
return err
448+
}
449+
}
450+
451+
// Delete all the events
452+
if len(eventsToDelete) > 0 {
453+
deleteDs := goqu.Dialect("postgres").
454+
Delete("users_subscriptions").
455+
Where(goqu.Or(eventsToDelete...))
456+
457+
query, args, err := deleteDs.Prepared(true).ToSQL()
458+
if err != nil {
459+
return fmt.Errorf("error preparing query: %v", err)
460+
}
461+
462+
_, err = tx.ExecContext(ctx, query, args...)
463+
if err != nil {
464+
return err
465+
}
466+
}
467+
468+
err = tx.Commit()
469+
if err != nil {
470+
return fmt.Errorf("error committing tx to update general notification settings: %w", err)
471+
}
472+
return nil
303473
}
304474
func (d *DataAccessService) UpdateNotificationSettingsPairedDevice(ctx context.Context, userId uint64, pairedDeviceId string, name string, IsNotificationsEnabled bool) error {
305-
return d.dummy.UpdateNotificationSettingsPairedDevice(ctx, userId, pairedDeviceId, name, IsNotificationsEnabled)
475+
result, err := d.userWriter.ExecContext(ctx, `
476+
UPDATE users_devices
477+
SET
478+
device_name = $1,
479+
notify_enabled = $2
480+
WHERE user_id = $3 AND device_identifier = $4`,
481+
name, IsNotificationsEnabled, userId, pairedDeviceId)
482+
if err != nil {
483+
return err
484+
}
485+
486+
// TODO: This can be deleted when the API layer has an improved check for the device id
487+
rowsAffected, err := result.RowsAffected()
488+
if err != nil {
489+
return err
490+
}
491+
if rowsAffected == 0 {
492+
return fmt.Errorf("device with id %s to update notification settings not found", pairedDeviceId)
493+
}
494+
return nil
306495
}
307496
func (d *DataAccessService) DeleteNotificationSettingsPairedDevice(ctx context.Context, userId uint64, pairedDeviceId string) error {
308-
return d.dummy.DeleteNotificationSettingsPairedDevice(ctx, userId, pairedDeviceId)
497+
result, err := d.userWriter.ExecContext(ctx, `
498+
DELETE FROM users_devices
499+
WHERE user_id = $1 AND device_identifier = $2`,
500+
userId, pairedDeviceId)
501+
if err != nil {
502+
return err
503+
}
504+
505+
// TODO: This can be deleted when the API layer has an improved check for the device id
506+
rowsAffected, err := result.RowsAffected()
507+
if err != nil {
508+
return err
509+
}
510+
if rowsAffected == 0 {
511+
return fmt.Errorf("device with id %s to delete not found", pairedDeviceId)
512+
}
513+
return nil
309514
}
310515
func (d *DataAccessService) UpdateNotificationSettingsClients(ctx context.Context, userId uint64, clientId uint64, IsSubscribed bool) (*t.NotificationSettingsClient, error) {
311-
return d.dummy.UpdateNotificationSettingsClients(ctx, userId, clientId, IsSubscribed)
516+
result := &t.NotificationSettingsClient{Id: clientId, IsSubscribed: IsSubscribed}
517+
518+
var clientInfo *t.ClientInfo
519+
520+
clients, err := d.GetAllClients()
521+
if err != nil {
522+
return nil, err
523+
}
524+
for _, client := range clients {
525+
if client.Id == clientId {
526+
clientInfo = &client
527+
break
528+
}
529+
}
530+
if clientInfo == nil {
531+
return nil, fmt.Errorf("client with id %d to update client notification settings not found", clientId)
532+
}
533+
534+
if IsSubscribed {
535+
_, err = d.userWriter.ExecContext(ctx, `
536+
INSERT INTO users_subscriptions (user_id, event_name, event_filter, created_ts, created_epoch)
537+
VALUES ($1, $2, $3, NOW(), $4)
538+
ON CONFLICT (user_id, event_name, event_filter)
539+
DO NOTHING`,
540+
userId, types.EthClientUpdateEventName, clientInfo.Name, utils.TimeToEpoch(time.Now()))
541+
} else {
542+
_, err = d.userWriter.ExecContext(ctx, `DELETE FROM users_subscriptions WHERE user_id = $1 AND event_name = $2 AND event_filter = $3`,
543+
userId, types.EthClientUpdateEventName, clientInfo.Name)
544+
}
545+
if err != nil {
546+
return nil, err
547+
}
548+
549+
result.Name = clientInfo.Name
550+
result.Category = clientInfo.Category
551+
552+
return result, nil
312553
}
313554
func (d *DataAccessService) GetNotificationSettingsDashboards(ctx context.Context, userId uint64, cursor string, colSort t.Sort[enums.NotificationSettingsDashboardColumn], search string, limit uint64) ([]t.NotificationSettingsDashboardsTableRow, *t.Paging, error) {
314555
return d.dummy.GetNotificationSettingsDashboards(ctx, userId, cursor, colSort, search, limit)
@@ -319,3 +560,12 @@ func (d *DataAccessService) UpdateNotificationSettingsValidatorDashboard(ctx con
319560
func (d *DataAccessService) UpdateNotificationSettingsAccountDashboard(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, settings t.NotificationSettingsAccountDashboard) error {
320561
return d.dummy.UpdateNotificationSettingsAccountDashboard(ctx, dashboardId, groupId, settings)
321562
}
563+
564+
func (d *DataAccessService) AddOrRemoveEvent(eventsToInsert *[]goqu.Record, eventsToDelete *[]goqu.Expression, isSubscribed bool, userId uint64, eventName string, eventFilter string, epoch int64, threshold float64) {
565+
if isSubscribed {
566+
event := goqu.Record{"user_id": userId, "event_name": eventName, "event_filter": eventFilter, "created_ts": goqu.L("NOW()"), "created_epoch": epoch, "event_threshold": threshold}
567+
*eventsToInsert = append(*eventsToInsert, event)
568+
} else {
569+
*eventsToDelete = append(*eventsToDelete, goqu.Ex{"user_id": userId, "event_name": eventName, "event_filter": eventFilter})
570+
}
571+
}

0 commit comments

Comments
 (0)