Regarding Update balances function in transaction.go #125
-
func (l *Blnk) updateBalances(ctx context.Context, sourceBalance, destinationBalance *model.Balance) error {
ctx, span := tracer.Start(ctx, "Updating Balances")
defer span.End()
var wg sync.WaitGroup
// Update the balances in the datasource
if err := l.datasource.UpdateBalances(ctx, sourceBalance, destinationBalance); err != nil {
span.RecordError(err)
return err
}
// Add two tasks to the wait group
wg.Add(2)
// Goroutine to check monitors and queue index data for the source balance
go func() {
defer wg.Done()
l.checkBalanceMonitors(ctx, sourceBalance)
err := l.queue.queueIndexData(sourceBalance.BalanceID, "balances", sourceBalance)
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
}()
// Goroutine to check monitors and queue index data for the destination balance
go func() {
defer wg.Done()
l.checkBalanceMonitors(ctx, destinationBalance)
err := l.queue.queueIndexData(destinationBalance.BalanceID, "balances", destinationBalance)
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
}()
// Wait for both goroutines to complete
wg.Wait()
span.AddEvent("Balances updated")
return nil
} I was going through the codebase and had a question about this function—why do we queue balances? Also, I haven’t gone through the entire codebase yet, but I noticed Redis is used for distributed locks, transaction queues, hooks etc. Does the line above mean we’re also using Redis to store balances? If yes, where are we retrieving it? I’ve seen this code used in transactions, ledgers, and several other places. Could you briefly explain how Redis is being used, in case I missed something? P.S. It’s a really well-written codebase—I genuinely enjoyed reading through it! Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Hi @darahask, Thanks for the kind words! So this is how Redis is used. It currently does a couple of things like you mentioned:
For the job queues, there are a couple of jobs it manages (using the workers):
The method queueIndexData is for indexing to Typesense (maybe I should rename it 🤔) // queueIndexData enqueues a task to index data in a specified collection.
//
// Parameters:
// - id string: The ID of the data to index.
// - collection string: The name of the collection to index the data in.
// - data interface{}: The data to be indexed.
//
// Returns:
// - error: An error if the task could not be enqueued.
func (q *Queue) queueIndexData(id string, collection string, data interface{}) error {
cfg, err := config.Fetch()
if err != nil {
return err
}
payload := map[string]interface{}{
"collection": collection,
"payload": data,
}
IPayload, err := json.Marshal(payload)
if err != nil {
return err
}
taskOptions := []asynq.Option{asynq.Queue(cfg.Queue.IndexQueue)}
task := asynq.NewTask(cfg.Queue.IndexQueue, IPayload, taskOptions...)
info, err := q.Client.Enqueue(task)
if err != nil {
log.Println("here", err, info)
return err
}
log.Printf(" [*] Successfully enqueued index data: %+v", id)
return nil
} Then in the worker, it indexes to Typesense for retrievals and search. Check here |
Beta Was this translation helpful? Give feedback.
Hi @darahask, Thanks for the kind words!
So this is how Redis is used. It currently does a couple of things like you mentioned:
For the job queues, there are a couple of jobs it manages (using the workers):
The method queueIndexData is for indexing to Typesense (maybe I should rename it 🤔)