Skip to content

Commit ad3ad49

Browse files
committed
Add initial kubernetes and dgraph relationship model
1 parent 002a2d1 commit ad3ad49

10 files changed

+1107
-8
lines changed

client/pkg/dgraph/dgraph.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package dgraph
2+
3+
import (
4+
"context"
5+
6+
"github.com/dgraph-io/dgo/v240"
7+
"github.com/dgraph-io/dgo/v240/protos/api"
8+
)
9+
10+
// Dgraph schema for Kubernetes resources
11+
const dgraphSchema = `
12+
group: string @index(exact) .
13+
apiVersion: string @index(exact) .
14+
kind: string @index(exact) .
15+
metadata_name: string @index(exact) .
16+
metadata_namespace: string @index(exact) .
17+
metadata_uid: string @index(exact) .
18+
metadata_labels: [uid] .
19+
status_phase: string @index(exact) .
20+
spec_nodeName: string @index(exact) .
21+
isCurrent: bool @index(bool) .
22+
lastUpdated: datetime .
23+
24+
key: string @index(exact) .
25+
value: string .
26+
27+
relationships: [uid] @reverse .
28+
targetResource: string @index(exact) .
29+
relationshipType: string @index(exact) .
30+
targetUID: string @index(exact) .
31+
target: uid .
32+
33+
type KubernetesResource {
34+
dgraph.type: string
35+
group: string
36+
apiVersion: string
37+
kind: string
38+
39+
metadata_name: string
40+
metadata_namespace: string
41+
metadata_uid: string!
42+
metadata_labels: [Label]
43+
44+
isCurrent: bool
45+
lastUpdated: DateTime
46+
47+
status_phase: string
48+
spec_nodeName: string
49+
relationships: [Relationship]
50+
}
51+
52+
type Label {
53+
key: string
54+
value: string
55+
}
56+
57+
type Relationship {
58+
dgraph.type: string
59+
targetResource: string
60+
relationshipType: string
61+
targetUID: string
62+
target: KubernetesResource
63+
}
64+
`
65+
66+
// SetupDgraphSchema sets up the Dgraph schema for Kubernetes resources
67+
func SetupDgraphSchema(ctx context.Context, dgraphClient *dgo.Dgraph) error {
68+
op := &api.Operation{
69+
Schema: dgraphSchema,
70+
}
71+
return dgraphClient.Alter(ctx, op)
72+
}
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package dgraph
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"strings"
9+
10+
dgraphModel "github.com/intelops/kubviz/model/dgraph"
11+
12+
"github.com/dgraph-io/dgo/v240"
13+
"github.com/dgraph-io/dgo/v240/protos/api"
14+
)
15+
16+
var ErrResourceNotFound = errors.New("resource not found")
17+
18+
// DgraphKubernetesResourceRepository handles Dgraph operations for KubernetesResource
19+
type DgraphKubernetesResourceRepository struct {
20+
client *dgo.Dgraph
21+
}
22+
23+
// NewDgraphKubernetesResourceRepository creates a new DgraphKubernetesResourceRepository instance
24+
func NewDgraphKubernetesResourceRepository(client *dgo.Dgraph) *DgraphKubernetesResourceRepository {
25+
return &DgraphKubernetesResourceRepository{client: client}
26+
}
27+
28+
func (repo *DgraphKubernetesResourceRepository) Save(ctx context.Context, resource *dgraphModel.KubernetesResource) error {
29+
txn := repo.client.NewTxn()
30+
defer txn.Discard(ctx)
31+
32+
existingResource, err := repo.FindByKubernetesUID(ctx, resource.MetadataUID)
33+
if err != nil && !errors.Is(err, ErrResourceNotFound) {
34+
return fmt.Errorf("error checking existing resource: %v", err)
35+
}
36+
37+
var mutations []*api.Mutation
38+
39+
if existingResource != nil {
40+
resource.UID = existingResource.UID
41+
42+
// Handle upsert labels
43+
44+
// Compute label diffs
45+
labelsToAdd, labelsToRemove, _ := diffLabels(existingResource.MetadataLabels, resource.MetadataLabels)
46+
47+
// Prepare label removal mutation if needed
48+
if len(labelsToRemove) > 0 {
49+
deleteLabelsNquads := generateLabelDeleteNquads(resource.UID, labelsToRemove)
50+
mutations = append(mutations, &api.Mutation{
51+
DelNquads: []byte(strings.Join(deleteLabelsNquads, "\n")),
52+
})
53+
}
54+
55+
// Prepare label addition mutations if needed
56+
if len(labelsToAdd) > 0 {
57+
addLabelsNquads := generateLabelAddNquads(resource.UID, labelsToAdd)
58+
mutations = append(mutations, &api.Mutation{
59+
SetNquads: []byte(strings.Join(addLabelsNquads, "\n")),
60+
})
61+
}
62+
}
63+
64+
resourceCopy := *resource
65+
resourceCopy.DgraphType = []string{"KubernetesResource"}
66+
67+
if existingResource != nil {
68+
// Unsetting labels here to be handled in a separate mutation
69+
resourceCopy.MetadataLabels = nil
70+
}
71+
72+
resourceJSON, err := json.Marshal(resourceCopy)
73+
if err != nil {
74+
return fmt.Errorf("error json marshaling resource: %v", err)
75+
}
76+
77+
mutations = append(mutations, &api.Mutation{
78+
SetJson: resourceJSON,
79+
})
80+
81+
// Perform all mutations in a single request
82+
request := &api.Request{
83+
Mutations: mutations,
84+
CommitNow: true,
85+
}
86+
87+
_, err = txn.Do(ctx, request)
88+
if err != nil {
89+
return fmt.Errorf("error performing mutations: %v", err)
90+
}
91+
92+
return nil
93+
}
94+
95+
func diffLabels(oldLabels, newLabels []dgraphModel.Label) (toAdd, toRemove, toUpdate []dgraphModel.Label) {
96+
oldMap := make(map[string]string)
97+
for _, label := range oldLabels {
98+
oldMap[label.Key] = label.Value
99+
}
100+
101+
newMap := make(map[string]string)
102+
for _, label := range newLabels {
103+
newMap[label.Key] = label.Value
104+
}
105+
106+
toAdd = []dgraphModel.Label{}
107+
toRemove = []dgraphModel.Label{}
108+
toUpdate = []dgraphModel.Label{}
109+
110+
for _, newLabel := range newLabels {
111+
if oldValue, exists := oldMap[newLabel.Key]; !exists {
112+
toAdd = append(toAdd, newLabel)
113+
} else if oldValue != newLabel.Value {
114+
toUpdate = append(toUpdate, newLabel)
115+
}
116+
}
117+
118+
for _, oldLabel := range oldLabels {
119+
if _, exists := newMap[oldLabel.Key]; !exists {
120+
toRemove = append(toRemove, oldLabel)
121+
}
122+
}
123+
124+
return toAdd, toRemove, toUpdate
125+
}
126+
127+
func generateLabelDeleteNquads(uid string, labels []dgraphModel.Label) []string {
128+
var nquads []string
129+
for _, label := range labels {
130+
nquad := fmt.Sprintf("<%s> <metadata_labels> * (key = %q) .", uid, label.Key)
131+
nquads = append(nquads, nquad)
132+
}
133+
return nquads
134+
}
135+
136+
func generateLabelAddNquads(uid string, labelsToAdd []dgraphModel.Label) []string {
137+
var nquads []string
138+
for _, label := range labelsToAdd {
139+
labelUID := fmt.Sprintf("_:label_%s", label.Key)
140+
nquads = append(nquads, fmt.Sprintf("<%s> <metadata_labels> %s .", uid, labelUID))
141+
nquads = append(nquads, fmt.Sprintf("%s <dgraph.type> \"Label\" .", labelUID))
142+
nquads = append(nquads, fmt.Sprintf("%s <key> %q .", labelUID, label.Key))
143+
nquads = append(nquads, fmt.Sprintf("%s <value> %q .", labelUID, label.Value))
144+
}
145+
return nquads
146+
}
147+
148+
// getResourceQueryFields returns the common query fields for KubernetesResource
149+
func (repo *DgraphKubernetesResourceRepository) getResourceQueryFields() string {
150+
return `
151+
uid
152+
dgraph.type
153+
group
154+
apiVersion
155+
kind
156+
metadata_name
157+
metadata_namespace
158+
metadata_uid
159+
metadata_labels {
160+
key
161+
value
162+
}
163+
status_phase
164+
spec_nodeName
165+
isCurrent
166+
lastUpdated
167+
additionalFields
168+
relationships {
169+
uid
170+
dgraph.type
171+
targetResource
172+
relationshipType
173+
targetUID
174+
target {
175+
uid
176+
metadata_name
177+
metadata_namespace
178+
metadata_uid
179+
}
180+
}
181+
`
182+
}
183+
184+
// FindByDgraphUID retrieves a KubernetesResource by its Dgraph UID
185+
func (repo *DgraphKubernetesResourceRepository) FindByDgraphUID(ctx context.Context, dgraphUID string) (*dgraphModel.KubernetesResource, error) {
186+
txn := repo.client.NewTxn()
187+
defer txn.Discard(ctx)
188+
189+
q := fmt.Sprintf(`
190+
query findResource($dgraphUID: string) {
191+
resources(func: uid($dgraphUID)) @filter(eq(dgraph.type, "KubernetesResource")) {
192+
%s
193+
}
194+
}
195+
`, repo.getResourceQueryFields())
196+
197+
vars := map[string]string{"$dgraphUID": dgraphUID}
198+
resp, err := txn.QueryWithVars(ctx, q, vars)
199+
if err != nil {
200+
return nil, fmt.Errorf("error querying Dgraph: %v", err)
201+
}
202+
203+
var result struct {
204+
Resources []dgraphModel.KubernetesResource `json:"resources"`
205+
}
206+
207+
if err := json.Unmarshal(resp.Json, &result); err != nil {
208+
return nil, fmt.Errorf("error unmarshaling response: %v", err)
209+
}
210+
211+
if len(result.Resources) == 0 {
212+
return nil, ErrResourceNotFound
213+
}
214+
215+
return &result.Resources[0], nil
216+
}
217+
218+
// FindByKubernetesUID retrieves a KubernetesResource by its Kubernetes UID
219+
func (repo *DgraphKubernetesResourceRepository) FindByKubernetesUID(ctx context.Context, k8sUID string) (*dgraphModel.KubernetesResource, error) {
220+
txn := repo.client.NewTxn()
221+
defer txn.Discard(ctx)
222+
223+
q := fmt.Sprintf(`
224+
query findResource($k8sUID: string) {
225+
resources(func: eq(metadata_uid, $k8sUID)) @filter(eq(dgraph.type, "KubernetesResource")) {
226+
%s
227+
}
228+
}
229+
`, repo.getResourceQueryFields())
230+
231+
vars := map[string]string{"$k8sUID": k8sUID}
232+
resp, err := txn.QueryWithVars(ctx, q, vars)
233+
if err != nil {
234+
return nil, fmt.Errorf("error querying Dgraph: %v", err)
235+
}
236+
237+
var result struct {
238+
Resources []dgraphModel.KubernetesResource `json:"resources"`
239+
}
240+
241+
if err := json.Unmarshal(resp.Json, &result); err != nil {
242+
return nil, fmt.Errorf("error unmarshaling response: %v", err)
243+
}
244+
245+
if len(result.Resources) == 0 {
246+
return nil, ErrResourceNotFound
247+
}
248+
249+
return &result.Resources[0], nil
250+
}

0 commit comments

Comments
 (0)