Skip to content

Commit 2f906ed

Browse files
committed
updated txn
fixed numgo stuff added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations added batching for mutations
1 parent 15d50fc commit 2f906ed

File tree

5 files changed

+105
-19
lines changed

5 files changed

+105
-19
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/IBM/sarama v1.45.2
99
github.com/Masterminds/semver/v3 v3.4.0
1010
github.com/blevesearch/bleve/v2 v2.5.2
11-
github.com/dgraph-io/badger/v4 v4.7.0
11+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715123309-49be996f27be
1212
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2
@@ -63,7 +63,7 @@ require (
6363
golang.org/x/mod v0.25.0
6464
golang.org/x/net v0.41.0
6565
golang.org/x/sync v0.15.0
66-
golang.org/x/sys v0.33.0
66+
golang.org/x/sys v0.34.0
6767
golang.org/x/term v0.32.0
6868
golang.org/x/text v0.26.0
6969
golang.org/x/tools v0.34.0

go.sum

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,22 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
126126
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
127127
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
128128
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
129-
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
130-
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
129+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715061448-bd5f80755a2c h1:eMV7XdUjTRyGgxQRJtF3DbeCpHgRsMPzGNDbAzJ0zqc=
130+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715061448-bd5f80755a2c/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
131+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715063358-ef5eb875b90f h1:XPGysDdNJfWJ4cvyJNXdJ9wbOhOpMAjpH8Fvkz8TCoE=
132+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715063358-ef5eb875b90f/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
133+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715063643-bc1f7f54fba3 h1:55YlS1UdgRFG2dWoAiW02Mpm/YkYNqwaK3oJ4mp3yCk=
134+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715063643-bc1f7f54fba3/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
135+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715071501-03ae2cab91f2 h1:3But8hbRf8lBZV1Lmlu2fPQwN+tK55nY6QXr6imaheQ=
136+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715071501-03ae2cab91f2/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
137+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715084356-1ba0b8dcb5a5 h1:rPjxqSrbuRTaVYJ1qMiQTdD+3GD8k6AUgZrqevMnFbw=
138+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715084356-1ba0b8dcb5a5/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
139+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715093118-9d43f422ab9a h1:I7ub/hfKh0g/B97TVebAzZiZGQeMOlMZb0mfZpHC2jo=
140+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715093118-9d43f422ab9a/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
141+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715094626-498b54d20d5f h1:b6+b/g3lPvwAtkLyTeacm60FgYj3ODNZDFnbbTVB3A0=
142+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715094626-498b54d20d5f/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
143+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715123309-49be996f27be h1:aDL5vpafcV5RuDfnyE+D1CdfQ24WuO6HrOBPJHZ6Ls4=
144+
github.com/dgraph-io/badger/v4 v4.7.1-0.20250715123309-49be996f27be/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
131145
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d h1:9PLyvZY1Nih05g+2womk+kNnX3Gb20kx5BsK3foA5a8=
132146
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
133147
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
@@ -786,8 +800,8 @@ golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBc
786800
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
787801
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
788802
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
789-
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
790-
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
803+
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
804+
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
791805
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
792806
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
793807
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

posting/index.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,12 @@ func (r *rebuilder) Run(ctx context.Context) error {
899899
WithCompression(options.None).
900900
WithLoggingLevel(badger.WARNING).
901901
WithMetricsEnabled(false)
902-
902+
dbOpts.DetectConflicts = false
903+
dbOpts.NumLevelZeroTables = 200
904+
dbOpts.NumLevelZeroTablesStall = 400
905+
dbOpts.BaseLevelSize = 5 * 128 << 20
906+
dbOpts.BaseTableSize = 128 << 20
907+
dbOpts.MemTableSize = 128 << 20
903908
// Set cache if we have encryption.
904909
if len(x.WorkerConfig.EncryptionKey) > 0 {
905910
dbOpts.EncryptionKey = x.WorkerConfig.EncryptionKey
@@ -924,11 +929,43 @@ func (r *rebuilder) Run(ctx context.Context) error {
924929
stream := pstore.NewStreamAt(r.startTs)
925930
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
926931
stream.Prefix = r.prefix
932+
stream.MaxSize = (uint64(dbOpts.MemTableSize) * 9) / 10
927933
//TODO We need to create a single transaction irrespective of the type of the predicate
928934
if pred.ValueType == pb.Posting_VFLOAT {
929935
x.AssertTrue(false)
930936
}
931-
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
937+
938+
stream.UseKeyToListWithThreadId = true
939+
940+
const maxThreadIds = 10000
941+
942+
txns := make([]*Txn, maxThreadIds)
943+
for i := range txns {
944+
txns[i] = NewTxn(r.startTs)
945+
}
946+
947+
stream.FinishThread = func(threadId int) (*bpb.KVList, error) {
948+
// Convert data into deltas.
949+
streamTxn := txns[threadId]
950+
streamTxn.Update()
951+
// txn.cache.Lock() is not required because we are the only one making changes to txn.
952+
kvs := make([]*bpb.KV, 0)
953+
954+
for key, data := range streamTxn.cache.deltas {
955+
version := atomic.AddUint64(&counter, 1)
956+
kv := bpb.KV{
957+
Key: []byte(key),
958+
Value: data,
959+
UserMeta: []byte{BitDeltaPosting},
960+
Version: version,
961+
}
962+
kvs = append(kvs, &kv)
963+
}
964+
txns[threadId] = NewTxn(r.startTs)
965+
return &bpb.KVList{Kv: kvs}, nil
966+
}
967+
968+
stream.KeyToListWithThreadId = func(key []byte, itr *badger.Iterator, threadId int) (*bpb.KVList, error) {
932969
// We should return quickly if the context is no longer valid.
933970
select {
934971
case <-ctx.Done():
@@ -946,22 +983,29 @@ func (r *rebuilder) Run(ctx context.Context) error {
946983
return nil, errors.Wrapf(err, "error reading posting list from disk")
947984
}
948985

949-
kvs, err := l.Rollup(nil, r.startTs)
950-
if err != nil {
951-
return nil, err
986+
kvs := make([]*bpb.KV, 0)
987+
if pred.Count {
988+
kvs, err = l.Rollup(nil, r.startTs)
989+
if err != nil {
990+
return nil, err
991+
}
952992
}
953993

954994
for _, kv := range kvs {
955995
version := atomic.AddUint64(&counter, 1)
956996
kv.Version = version
957997
}
958998

959-
streamTxn := NewTxn(r.startTs)
999+
streamTxn := txns[threadId]
9601000
_, err = r.fn(pk.Uid, l, streamTxn)
9611001
if err != nil {
9621002
return nil, err
9631003
}
9641004

1005+
if len(streamTxn.cache.plists) < 10000 {
1006+
return &bpb.KVList{Kv: kvs}, nil
1007+
}
1008+
9651009
// Convert data into deltas.
9661010
streamTxn.Update()
9671011
// txn.cache.Lock() is not required because we are the only one making changes to txn.
@@ -976,9 +1020,15 @@ func (r *rebuilder) Run(ctx context.Context) error {
9761020
kvs = append(kvs, &kv)
9771021
}
9781022

1023+
txns[threadId] = NewTxn(r.startTs)
9791024
return &bpb.KVList{Kv: kvs}, nil
9801025
}
1026+
9811027
stream.Send = func(buf *z.Buffer) error {
1028+
t1 := time.Now()
1029+
defer func() {
1030+
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger %d bytes took %v", r.attr, len(buf.Bytes()), time.Since(t1))
1031+
}()
9821032
if err := tmpWriter.Write(buf); err != nil {
9831033
return errors.Wrap(err, "error setting entries in temp badger")
9841034
}

tok/tok.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -449,29 +449,44 @@ func (t ShinglesTokenizer) Tokens(v interface{}) ([]string, error) {
449449
tokens = filterStemmers(lang, tokens)
450450

451451
// Step 4: Generate shingles (bigrams and trigrams)
452-
shingled := make([]string, 0, len(tokens))
452+
453+
shingled := make(map[string]struct{}, len(tokens))
453454
n := len(tokens)
454455

456+
addToRes := func(token string) {
457+
if len(token) < 30 {
458+
shingled[token] = struct{}{}
459+
return
460+
}
461+
462+
hash := blake2b.Sum256([]byte(token))
463+
shingled[string(hash[:])] = struct{}{}
464+
}
465+
455466
for i := 0; i < n; i++ {
456467
// unigram
457-
shingled = append(shingled, string(tokens[i].Term))
468+
addToRes(string(tokens[i].Term))
458469

459470
// bigram
460471
if i+1 < n {
461-
shingled = append(shingled, string(tokens[i].Term)+" "+string(tokens[i+1].Term))
472+
addToRes(string(tokens[i].Term) + " " + string(tokens[i+1].Term))
462473
}
463474
// trigram
464475
if i+2 < n {
465-
shingled = append(shingled, string(tokens[i].Term)+" "+string(tokens[i+1].Term)+" "+string(tokens[i+2].Term))
476+
addToRes(string(tokens[i].Term) + " " + string(tokens[i+1].Term) + " " + string(tokens[i+2].Term))
466477
}
467478

468479
if i+3 < n {
469-
shingled = append(shingled, string(tokens[i].Term)+" "+string(tokens[i+1].Term)+" "+string(tokens[i+2].Term)+" "+string(tokens[i+3].Term))
480+
addToRes(string(tokens[i].Term) + " " + string(tokens[i+1].Term) + " " + string(tokens[i+2].Term) + " " + string(tokens[i+3].Term))
470481
}
471482
}
472483

473-
// Step 5: Deduplicate
474-
return x.RemoveDuplicates(shingled), nil
484+
res := make([]string, 0, len(shingled))
485+
for k := range shingled {
486+
res = append(res, k)
487+
}
488+
489+
return res, nil
475490
}
476491
func (t ShinglesTokenizer) Identifier() byte { return IdentShingles }
477492
func (t ShinglesTokenizer) IsSortable() bool { return false }

tok/tok_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ func (o byEnc) Swap(i, j int) {
3030
o.tokens[i], o.tokens[j] = o.tokens[j], o.tokens[i]
3131
}
3232

33+
func BenchmarkShinglesTokenizer(b *testing.B) {
34+
tokenizer := ShinglesTokenizer{}
35+
for i := 0; i < b.N; i++ {
36+
BuildTokens("economy series like brother evening tough guess attorney student ago article own identify where care allow pay access decade period during pass fail term real report identify security manage try past account care off rock subject chance seek over effect address article full most believe feel court six involve miss country try if threat care same available tell such process language agreement access poor strong reach cold section past way my seem order live consumer home imagine think return couple office majority answer discover figure college girl Republican enter form recently year experience economic read show necessary general lot bag buy along their painting player information certainly ground soon learn without write determine sure choice sell love subject decide ball death leave happy worry know early main address condition west room he plan weight step loss actually ready provide far fine best nice fund great your scene woman service check feel song ahead material hot leg contain knowledge article lawyer cold", tokenizer)
37+
}
38+
}
39+
3340
func TestIntEncoding(t *testing.T) {
3441
a := int64(1<<24 + 10)
3542
b := int64(-1<<24 - 1)

0 commit comments

Comments
 (0)