Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit e34611a

Browse files
use caching v2 (#23)
Signed-off-by: Ayman <enkhalifapro@gmail.com> Signed-off-by: Ayman <enkhalifapro@gmail.com> Co-authored-by: Ayman <enkhalifapro@gmail.com>
1 parent f10be4d commit e34611a

File tree

3 files changed

+228
-60
lines changed

3 files changed

+228
-60
lines changed

cmd/confluence/confluence.go

Lines changed: 127 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/sha256"
67
"encoding/base64"
8+
"encoding/csv"
79
"encoding/json"
810
"flag"
911
"fmt"
@@ -57,14 +59,16 @@ const (
5759
var (
5860
gMaxUpdatedAt time.Time
5961
gMaxUpdatedAtMtx = &sync.Mutex{}
62+
cachedSpaces = make(map[string]EntityCache)
63+
spacesCacheFile = "spaces-cache.csv"
6064
// ConfluenceDataSource - constant
6165
//ConfluenceDataSource = &models.DataSource{Name: "Confluence", Slug: "confluence", Model: "documentation"}
6266
//gConfluenceMetaData = &models.MetaData{BackendName: "confluence", BackendVersion: ConfluenceBackendVersion}
6367
)
6468

6569
// Publisher - publish data to S3
6670
type Publisher interface {
67-
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) error
71+
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) (string, error)
6872
}
6973

7074
// DSConfluence - DS implementation for confluence - does nothing at all, just presents a skeleton code
@@ -344,8 +348,8 @@ func (j *DSConfluence) GetHistoricalContents(ctx *shared.Ctx, content map[string
344348
headers,
345349
nil,
346350
nil,
347-
map[[2]int]struct{}{{200, 200}: {}}, // JSON statuses: 200
348-
nil, // Error statuses
351+
map[[2]int]struct{}{{200, 200}: {}}, // JSON statuses: 200
352+
nil, // Error statuses
349353
map[[2]int]struct{}{{200, 200}: {}, {500, 500}: {}, {404, 404}: {}}, // OK statuses: 200
350354
map[[2]int]struct{}{{200, 200}: {}}, // Cache statuses: 200
351355
false, // retry
@@ -574,6 +578,7 @@ func (j *DSConfluence) Sync(ctx *shared.Ctx) (err error) {
574578
if ctx.DateTo != nil {
575579
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v", j.URL, ctx.DateTo)
576580
}
581+
j.getCachedContent()
577582
// NOTE: Non-generic starts here
578583
var (
579584
sDateFrom string
@@ -1291,8 +1296,7 @@ func (j *DSConfluence) GetModelData(ctx *shared.Ctx, docs []interface{}) (data m
12911296
SourceTimestamp: updatedOn,
12921297
Children: kids,
12931298
}
1294-
cacheID := fmt.Sprintf("content-%s", confluenceContentID)
1295-
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, cacheID)
1299+
isCreated := isKeyCreated(confluenceContentID)
12961300
if err != nil {
12971301
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
12981302
return data, err
@@ -1338,30 +1342,26 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
13381342
contentsStr := "contents"
13391343
envStr := os.Getenv("STAGE")
13401344
// Push the event
1341-
d := make([]map[string]interface{}, 0)
13421345
for k, v := range data {
13431346
switch k {
13441347
case "created":
13451348
ev, _ := v[0].(insightsConf.ContentCreatedEvent)
1346-
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1347-
cacheData, err := j.cachedCreatedContent(v)
1349+
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1350+
err = j.cachedCreatedContent(v, path)
13481351
if err != nil {
13491352
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("cachedCreatedContent error: %+v", err)
13501353
return
13511354
}
1352-
d = append(d, cacheData...)
13531355
case "updated":
1354-
updates, cacheData, err := j.preventUpdateDuplication(v)
1356+
updates, err := j.preventUpdateDuplication(v)
13551357
if err != nil {
13561358
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("preventUpdateDuplication error: %+v", err)
13571359
return
13581360
}
1359-
if len(cacheData) > 0 {
1360-
d = append(d, cacheData...)
1361-
}
1361+
13621362
if len(updates) > 0 {
13631363
ev, _ := updates[0].(insightsConf.ContentUpdatedEvent)
1364-
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
1364+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
13651365
}
13661366
default:
13671367
err = fmt.Errorf("unknown confluence event type '%s'", k)
@@ -1370,11 +1370,8 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
13701370
break
13711371
}
13721372
}
1373-
if len(d) > 0 {
1374-
err = j.cacheProvider.Create(j.endpoint, d)
1375-
if err != nil {
1376-
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
1377-
}
1373+
if err = j.createCacheFile([]EntityCache{}, ""); err != nil {
1374+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
13781375
}
13791376
} else {
13801377
jsonBytes, err = jsoniter.Marshal(data)
@@ -1567,11 +1564,9 @@ func (j *DSConfluence) AddCacheProvider() {
15671564
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "http://"), "/", "-")
15681565
}
15691566

1570-
func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]interface{}, error) {
1571-
cacheData := make([]map[string]interface{}, 0)
1567+
func (j *DSConfluence) cachedCreatedContent(v []interface{}, path string) error {
15721568
for _, val := range v {
15731569
content := val.(insightsConf.ContentCreatedEvent).Payload
1574-
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
15751570
c := insightsConf.Content{
15761571
ID: content.ID,
15771572
EndpointID: content.EndpointID,
@@ -1587,22 +1582,24 @@ func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]inter
15871582
}
15881583
b, err := json.Marshal(c)
15891584
if err != nil {
1590-
return cacheData, err
1585+
return err
15911586
}
15921587
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1593-
cacheData = append(cacheData, map[string]interface{}{
1594-
"id": id,
1595-
"data": map[string]interface{}{
1596-
contentHashField: contentHash,
1597-
},
1598-
})
1588+
tStamp := content.SyncTimestamp.Unix()
1589+
cachedSpaces[content.ID] = EntityCache{
1590+
Timestamp: fmt.Sprintf("%v", tStamp),
1591+
EntityID: content.ID,
1592+
SourceEntityID: content.ContentID,
1593+
FileLocation: path,
1594+
Hash: contentHash,
1595+
Orphaned: false,
1596+
}
15991597
}
1600-
return cacheData, nil
1598+
return nil
16011599
}
16021600

1603-
func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, []map[string]interface{}, error) {
1601+
func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, error) {
16041602
updatedVals := make([]interface{}, 0, len(v))
1605-
cacheData := make([]map[string]interface{}, 0)
16061603
for _, val := range v {
16071604
content := val.(insightsConf.ContentUpdatedEvent).Payload
16081605
c := insightsConf.Content{
@@ -1620,25 +1617,108 @@ func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{},
16201617
}
16211618
b, err := json.Marshal(c)
16221619
if err != nil {
1623-
return updatedVals, cacheData, nil
1620+
return updatedVals, nil
16241621
}
16251622
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1626-
cacheID := fmt.Sprintf("content-%s", content.ID)
1627-
byt, err := j.cacheProvider.GetFileByKey(j.endpoint, cacheID)
1628-
if err != nil {
1629-
return updatedVals, cacheData, nil
1623+
cacheCon, ok := cachedSpaces[content.ID]
1624+
if !ok {
1625+
continue
16301626
}
1631-
cachedHash := make(map[string]interface{})
1632-
err = json.Unmarshal(byt, &cachedHash)
1633-
if contentHash != cachedHash["contentHash"] {
1627+
if contentHash != cacheCon.Hash {
16341628
updatedVals = append(updatedVals, val)
1635-
cacheData = append(cacheData, map[string]interface{}{
1636-
"id": cacheID,
1637-
"data": map[string]interface{}{
1638-
contentHashField: contentHash,
1639-
},
1640-
})
1629+
cacheCon.Hash = contentHash
1630+
cachedSpaces[content.ID] = cacheCon
16411631
}
16421632
}
1643-
return updatedVals, cacheData, nil
1633+
return updatedVals, nil
1634+
}
1635+
1636+
func (j *DSConfluence) getCachedContent() {
1637+
comB, err := j.cacheProvider.GetFileByKey(j.endpoint, spacesCacheFile)
1638+
if err != nil {
1639+
return
1640+
}
1641+
reader := csv.NewReader(bytes.NewBuffer(comB))
1642+
records, err := reader.ReadAll()
1643+
if err != nil {
1644+
return
1645+
}
1646+
for i, record := range records {
1647+
if i == 0 {
1648+
continue
1649+
}
1650+
orphaned, err := strconv.ParseBool(record[5])
1651+
if err != nil {
1652+
orphaned = false
1653+
}
1654+
1655+
cachedSpaces[record[1]] = EntityCache{
1656+
Timestamp: record[0],
1657+
EntityID: record[1],
1658+
SourceEntityID: record[2],
1659+
FileLocation: record[3],
1660+
Hash: record[4],
1661+
Orphaned: orphaned,
1662+
}
1663+
}
1664+
}
1665+
1666+
func (j *DSConfluence) createCacheFile(cache []EntityCache, path string) error {
1667+
for _, comm := range cache {
1668+
comm.FileLocation = path
1669+
cachedSpaces[comm.EntityID] = comm
1670+
}
1671+
records := [][]string{
1672+
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},
1673+
}
1674+
for _, c := range cachedSpaces {
1675+
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned)})
1676+
}
1677+
1678+
csvFile, err := os.Create(spacesCacheFile)
1679+
if err != nil {
1680+
return err
1681+
}
1682+
1683+
w := csv.NewWriter(csvFile)
1684+
err = w.WriteAll(records)
1685+
if err != nil {
1686+
return err
1687+
}
1688+
err = csvFile.Close()
1689+
if err != nil {
1690+
return err
1691+
}
1692+
file, err := os.ReadFile(spacesCacheFile)
1693+
if err != nil {
1694+
return err
1695+
}
1696+
err = os.Remove(spacesCacheFile)
1697+
if err != nil {
1698+
return err
1699+
}
1700+
err = j.cacheProvider.UpdateFileByKey(j.endpoint, spacesCacheFile, file)
1701+
if err != nil {
1702+
return err
1703+
}
1704+
1705+
return nil
1706+
}
1707+
1708+
func isKeyCreated(id string) bool {
1709+
_, ok := cachedSpaces[id]
1710+
if ok {
1711+
return true
1712+
}
1713+
return false
1714+
}
1715+
1716+
// EntityCache single commit cache schema
1717+
type EntityCache struct {
1718+
Timestamp string `json:"timestamp"`
1719+
EntityID string `json:"entity_id"`
1720+
SourceEntityID string `json:"source_entity_id"`
1721+
FileLocation string `json:"file_location"`
1722+
Hash string `json:"hash"`
1723+
Orphaned bool `json:"orphaned"`
16441724
}

go.mod

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ go 1.17
44

55
require (
66
github.com/LF-Engineering/insights-datasource-shared v1.5.20
7-
github.com/LF-Engineering/lfx-event-schema v0.1.29
7+
github.com/LF-Engineering/lfx-event-schema v0.1.35
88
github.com/aws/aws-sdk-go v1.43.3
99
github.com/json-iterator/go v1.1.12
1010
github.com/sirupsen/logrus v1.8.1
1111
gopkg.in/DataDog/dd-trace-go.v1 v1.43.1
1212
)
1313

1414
require (
15+
github.com/DataDog/datadog-agent/pkg/obfuscate v0.0.0-20211129110424-6491aa3bf583 // indirect
16+
github.com/DataDog/datadog-go v4.8.2+incompatible // indirect
17+
github.com/DataDog/datadog-go/v5 v5.0.2 // indirect
18+
github.com/DataDog/sketches-go v1.2.1 // indirect
19+
github.com/Microsoft/go-winio v0.5.1 // indirect
1520
github.com/avast/retry-go v3.0.0+incompatible // indirect
1621
github.com/aws/aws-sdk-go-v2 v1.11.2 // indirect
1722
github.com/aws/aws-sdk-go-v2/config v1.6.0 // indirect
@@ -25,18 +30,27 @@ require (
2530
github.com/aws/aws-sdk-go-v2/service/sso v1.3.2 // indirect
2631
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 // indirect
2732
github.com/aws/smithy-go v1.11.0 // indirect
33+
github.com/cespare/xxhash/v2 v2.1.2 // indirect
34+
github.com/dgraph-io/ristretto v0.1.0 // indirect
35+
github.com/dustin/go-humanize v1.0.0 // indirect
2836
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674 // indirect
37+
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
2938
github.com/google/uuid v1.3.0 // indirect
3039
github.com/jmespath/go-jmespath v0.4.0 // indirect
40+
github.com/josharian/intern v1.0.0 // indirect
41+
github.com/mailru/easyjson v0.7.7 // indirect
3142
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3243
github.com/modern-go/reflect2 v1.0.2 // indirect
44+
github.com/philhofer/fwd v1.1.1 // indirect
3345
github.com/pkg/errors v0.9.1 // indirect
34-
github.com/stretchr/testify v1.7.1 // indirect
46+
github.com/tinylib/msgp v1.1.2 // indirect
47+
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
48+
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
3549
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
36-
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
3750
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
3851
golang.org/x/text v0.3.7 // indirect
52+
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
3953
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
40-
gopkg.in/yaml.v2 v2.4.0 // indirect
41-
gopkg.in/yaml.v3 v3.0.1 // indirect
54+
google.golang.org/protobuf v1.28.0 // indirect
55+
inet.af/netaddr v0.0.0-20220617031823-097006376321 // indirect
4256
)

0 commit comments

Comments
 (0)