Skip to content

Commit e931d04

Browse files
authored
Merge pull request #1452 from 0chain/feat/ref-index
Single transaction is get regular refs and remove unused ref index
2 parents 88e40a4 + f8e8ae2 commit e931d04

28 files changed

+117
-199
lines changed

code/go/0chain.net/blobbercore/allocation/allocationchange.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (change *AllocationChange) Save(ctx context.Context) error {
100100

101101
func (change *AllocationChange) Update(ctx context.Context) error {
102102
db := datastore.GetStore().GetTransaction(ctx)
103-
return db.Table(change.TableName()).Where("lookup_hash = ?", change.LookupHash).Updates(map[string]interface{}{
103+
return db.Table(change.TableName()).Where("connection_id = ? AND lookup_hash = ?", change.ConnectionID, change.LookupHash).Updates(map[string]interface{}{
104104
"size": change.Size,
105105
"updated_at": time.Now(),
106106
"input": change.Input,
@@ -174,10 +174,11 @@ func GetAllocationChanges(ctx context.Context, connectionID, allocationID, clien
174174
func GetConnectionObj(ctx context.Context, connectionID, allocationID, clientID string) (*AllocationChangeCollector, error) {
175175
cc := &AllocationChangeCollector{}
176176
db := datastore.GetStore().GetTransaction(ctx)
177-
err := db.Where("id = ? and allocation_id = ? and client_id = ?",
177+
err := db.Where("id = ? and allocation_id = ? and client_id = ? AND status <> ?",
178178
connectionID,
179179
allocationID,
180180
clientID,
181+
DeletedConnection,
181182
).Take(cc).Error
182183

183184
if err == nil {
@@ -189,7 +190,7 @@ func GetConnectionObj(ctx context.Context, connectionID, allocationID, clientID
189190
cc.AllocationID = allocationID
190191
cc.ClientID = clientID
191192
cc.Status = NewConnection
192-
err = cc.Create(ctx)
193+
err = cc.Save(ctx)
193194
if err != nil {
194195
return nil, err
195196
}

code/go/0chain.net/blobbercore/allocation/connection.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
var (
1717
// ConnectionObjCleanInterval start to clean the connectionObjMap
18-
ConnectionObjCleanInterval = 10 * time.Minute
18+
ConnectionObjCleanInterval = 45 * time.Minute
1919
// ConnectionObjTimout after which connectionObj entry should be invalid
2020
ConnectionObjTimeout = 30 * time.Minute
2121
)
@@ -182,7 +182,11 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string
182182
change.lock.Lock()
183183
defer change.lock.Unlock()
184184
connectionObj.lock.Unlock()
185-
err := cmd.AddChange(ctx)
185+
_, err := GetConnectionObj(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID)
186+
if err != nil {
187+
return saveChange, err
188+
}
189+
err = cmd.AddChange(ctx)
186190
if err != nil {
187191
return saveChange, err
188192
}

code/go/0chain.net/blobbercore/allocation/deletefilechange.go

Lines changed: 2 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,11 @@ package allocation
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"path/filepath"
87
"sync"
98

10-
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
11-
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
129
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
1310
"github.com/0chain/blobber/code/go/0chain.net/core/common"
14-
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
15-
"gorm.io/gorm"
16-
17-
"go.uber.org/zap"
1811
)
1912

2013
var (
@@ -58,73 +51,8 @@ func (nf *DeleteFileChange) DeleteTempFile() error {
5851
return nil
5952
}
6053

61-
func (nf *DeleteFileChange) CommitToFileStore(ctx context.Context, mut *sync.Mutex) error {
62-
db := datastore.GetStore().GetTransaction(ctx)
63-
type Result struct {
64-
Id string
65-
ValidationRoot string
66-
ThumbnailHash string
67-
FilestoreVersion int
68-
}
69-
70-
limitCh := make(chan struct{}, 10)
71-
wg := &sync.WaitGroup{}
72-
var results []Result
73-
mut.Lock()
74-
err := db.Model(&reference.Ref{}).Unscoped().
75-
Select("id", "validation_root", "thumbnail_hash", "filestore_version").
76-
Where("allocation_id=? AND path LIKE ? AND type=? AND deleted_at is not NULL",
77-
nf.AllocationID, nf.Path+"%", reference.FILE).
78-
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
79-
80-
for _, res := range results {
81-
var count int64
82-
tx.Model(&reference.Ref{}).
83-
Where("allocation_id=? AND validation_root=?", nf.AllocationID, res.ValidationRoot).
84-
Count(&count)
85-
86-
if count != 0 && res.ThumbnailHash == "" {
87-
continue
88-
}
89-
90-
limitCh <- struct{}{}
91-
wg.Add(1)
92-
93-
go func(res Result, count int64) {
94-
defer func() {
95-
<-limitCh
96-
wg.Done()
97-
}()
98-
99-
if count == 0 {
100-
err := filestore.GetFileStore().DeleteFile(nf.AllocationID, res.ValidationRoot, res.FilestoreVersion)
101-
if err != nil {
102-
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
103-
zap.String("validation_root", res.ValidationRoot))
104-
}
105-
}
106-
// We don't increase alloc size for thumbnail so we don't need to decrease it
107-
// if res.ThumbnailHash != "" {
108-
// err := filestore.GetFileStore().DeleteFile(nf.AllocationID, res.ThumbnailHash)
109-
// if err != nil {
110-
// logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
111-
// zap.String("thumbnail", res.ThumbnailHash))
112-
// }
113-
// }
114-
115-
}(res, count)
116-
117-
}
118-
return nil
119-
}).Error
120-
mut.Unlock()
121-
wg.Wait()
122-
123-
return err
124-
// return db.Model(&reference.Ref{}).Unscoped().
125-
// Delete(&reference.Ref{},
126-
// "allocation_id = ? AND path LIKE ? AND deleted_at IS NOT NULL",
127-
// nf.AllocationID, nf.Path+"%").Error
54+
func (nf *DeleteFileChange) CommitToFileStore(_ context.Context, _ *sync.Mutex) error {
55+
return nil
12856
}
12957

13058
func (nf *DeleteFileChange) GetPath() []string {

code/go/0chain.net/blobbercore/allocation/entity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func AddToPending(ctx context.Context, clientID, allocationID string, pendingWri
208208
defer lock.Unlock()
209209

210210
pending := new(Pending)
211-
err = db.Model(&Pending{}).Where("id=?", key).First(pending).Error
211+
err = db.Model(&Pending{}).Where("id=?", key).Take(pending).Error
212212
switch {
213213
case err == nil:
214214
pending.PendingWrite += pendingWrite

code/go/0chain.net/blobbercore/allocation/file_changer_update.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@ import (
66
"path/filepath"
77
"sync"
88

9-
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
109
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
1110
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
1211
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/util"
13-
"go.uber.org/zap"
1412

1513
"github.com/0chain/blobber/code/go/0chain.net/core/common"
16-
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
1714
)
1815

1916
type UpdateFileChanger struct {
@@ -105,23 +102,6 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
105102
}
106103

107104
func (nf *UpdateFileChanger) CommitToFileStore(ctx context.Context, mut *sync.Mutex) error {
108-
db := datastore.GetStore().GetTransaction(ctx)
109-
for hash, version := range nf.deleteHash {
110-
var count int64
111-
mut.Lock()
112-
err := db.Table((&reference.Ref{}).TableName()).
113-
Where(&reference.Ref{ValidationRoot: hash}).
114-
Where(&reference.Ref{AllocationID: nf.AllocationID}).
115-
Count(&count).Error
116-
mut.Unlock()
117-
if err == nil && count == 0 {
118-
logging.Logger.Info("Deleting content file", zap.String("validation_root", hash))
119-
if err := filestore.GetFileStore().DeleteFile(nf.AllocationID, hash, version); err != nil {
120-
logging.Logger.Error("FileStore_DeleteFile", zap.String("allocation_id", nf.AllocationID), zap.Error(err))
121-
}
122-
}
123-
}
124-
125105
return nf.BaseFileChanger.CommitToFileStore(ctx, mut)
126106
}
127107

code/go/0chain.net/blobbercore/allocation/protocol.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,18 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
5252
}
5353

5454
if err == nil {
55-
// load related terms
56-
var terms []*Terms
57-
err = tx.Model(terms).
58-
Where("allocation_id = ?", a.ID).
59-
Find(&terms).Error
60-
if err != nil {
61-
return nil, common.NewError("bad_db_operation", err.Error()) // unexpected DB error
55+
if len(a.Terms) == 0 {
56+
// load related terms
57+
var terms []*Terms
58+
err = tx.Model(terms).
59+
Where("allocation_id = ?", a.ID).
60+
Find(&terms).Error
61+
if err != nil {
62+
return nil, common.NewError("bad_db_operation", err.Error()) // unexpected DB error
63+
}
64+
a.Terms = terms // set field
6265
}
63-
a.Terms = terms // set field
64-
return // found in DB
66+
return // found in DB
6567
}
6668

6769
sa, err := requestAllocation(allocationID)

code/go/0chain.net/blobbercore/allocation/repository.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
const (
1717
SQLWhereGetById = "allocations.id = ?"
1818
SQLWhereGetByTx = "allocations.tx = ?"
19-
lruSize = 100
19+
lruSize = 500
2020
)
2121

2222
var (
@@ -141,6 +141,11 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
141141
cache[allocationID] = AllocationCache{
142142
Allocation: alloc,
143143
}
144+
//get allocation terms
145+
err = alloc.LoadTerms(ctx)
146+
if err != nil {
147+
return alloc, err
148+
}
144149
r.setAllocToGlobalCache(alloc)
145150
return alloc, err
146151
}

code/go/0chain.net/blobbercore/challenge/timing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func GetChallengeTiming(challengeID string) (*ChallengeTiming, error) {
184184

185185
err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
186186
tx := datastore.GetStore().GetTransaction(ctx)
187-
return tx.Model(&ChallengeTiming{}).Where("challenge_id = ?", challengeID).First(&ch).Error
187+
return tx.Model(&ChallengeTiming{}).Where("challenge_id = ?", challengeID).Take(&ch).Error
188188
})
189189
return ch, err
190190
}

code/go/0chain.net/blobbercore/datastore/mocket.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datastore
22

33
import (
44
"context"
5+
"database/sql"
56

67
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
78
mocket "github.com/selvatico/go-mocket"
@@ -71,7 +72,7 @@ func (store *Mocket) Close() {
7172
}
7273
}
7374

74-
func (store *Mocket) CreateTransaction(ctx context.Context) context.Context {
75+
func (store *Mocket) CreateTransaction(ctx context.Context,opts ...*sql.TxOptions) context.Context {
7576
db := store.db.Begin()
7677
return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(db))
7778
}

code/go/0chain.net/blobbercore/datastore/postgres.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datastore
22

33
import (
44
"context"
5+
"database/sql"
56
"fmt"
67
"time"
78

@@ -84,7 +85,7 @@ func (store *postgresStore) Close() {
8485
}
8586
}
8687

87-
func (store *postgresStore) CreateTransaction(ctx context.Context) context.Context {
88+
func (store *postgresStore) CreateTransaction(ctx context.Context, opts ...*sql.TxOptions) context.Context {
8889
db := store.db.WithContext(ctx).Begin()
8990
return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(db))
9091
}

0 commit comments

Comments
 (0)