Skip to content

feat: Add upsert and unique checking in local mode [DRAFT] #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,38 +198,38 @@ type MyNode struct {

modusGraph uses struct tags to define how each field should be handled in the graph database:

| Directive | Option | Description | Example |
| ----------- | -------- | ------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
| **index** | exact | Creates an exact-match index for string fields | Name string `json:"name" dgraph:"index=exact"` |
| | hash | Creates a hash index (same as exact) | Code string `json:"code" dgraph:"index=hash"` |
| | term | Creates a term index for text search | Description string `json:"description" dgraph:"index=term"` |
| | fulltext | Creates a full-text search index | Content string `json:"content" dgraph:"index=fulltext"` |
| | int | Creates an index for integer fields | Age int `json:"age" dgraph:"index=int"` |
| | geo | Creates a geolocation index | Location `json:"location" dgraph:"index=geo"` |
| | day | Creates a day-based index for datetime fields | Created time.Time `json:"created" dgraph:"index=day"` |
| | year | Creates a year-based index for datetime fields | Birthday time.Time `json:"birthday" dgraph:"index=year"` |
| | month | Creates a month-based index for datetime fields | Hired time.Time `json:"hired" dgraph:"index=month"` |
| | hour | Creates an hour-based index for datetime fields | Login time.Time `json:"login" dgraph:"index=hour"` |
| | hnsw | Creates a vector similarity index | Vector \*dg.VectorFloat32 `json:"vector" dgraph:"index=hnsw(metric:cosine)"` |
| **type** | geo | Specifies a geolocation field | Location `json:"location" dgraph:"type=geo"` |
| | datetime | Specifies a datetime field | CreatedAt time.Time `json:"createdAt" dgraph:"type=datetime"` |
| | int | Specifies an integer field | Count int `json:"count" dgraph:"type=int"` |
| | float | Specifies a floating-point field | Price float64 `json:"price" dgraph:"type=float"` |
| | bool | Specifies a boolean field | Active bool `json:"active" dgraph:"type=bool"` |
| | password | Specifies a password field (stored securely) | Password string `json:"password" dgraph:"type=password"` |
| **count** | | Creates a count index | Visits int `json:"visits" dgraph:"count"` |
| **unique** | | Enforces uniqueness for the field (remote Dgraph only) | Email string `json:"email" dgraph:"index=hash unique"` |
| **upsert** | | Allows a field to be used in upsert operations (remote Dgraph only) | UserID string `json:"userID" dgraph:"index=hash upsert"` |
| **reverse** | | Creates a bidirectional edge | Friends []\*Person `json:"friends" dgraph:"reverse"` |
| **lang** | | Enables multi-language support for the field | Description string `json:"description" dgraph:"lang"` |
| Directive | Option | Description | Example |
| ----------- | -------- | ----------------------------------------------- | ------------------------------------------------------------------------------------ |
| **index** | exact | Creates an exact-match index for string fields | Name string `json:"name" dgraph:"index=exact"` |
| | hash | Creates a hash index (same as exact) | Code string `json:"code" dgraph:"index=hash"` |
| | term | Creates a term index for text search | Description string `json:"description" dgraph:"index=term"` |
| | fulltext | Creates a full-text search index | Content string `json:"content" dgraph:"index=fulltext"` |
| | int | Creates an index for integer fields | Age int `json:"age" dgraph:"index=int"` |
| | geo | Creates a geolocation index | Location `json:"location" dgraph:"index=geo"` |
| | day | Creates a day-based index for datetime fields | Created time.Time `json:"created" dgraph:"index=day"` |
| | year | Creates a year-based index for datetime fields | Birthday time.Time `json:"birthday" dgraph:"index=year"` |
| | month | Creates a month-based index for datetime fields | Hired time.Time `json:"hired" dgraph:"index=month"` |
| | hour | Creates an hour-based index for datetime fields | Login time.Time `json:"login" dgraph:"index=hour"` |
| | hnsw | Creates a vector similarity index | Vector \*dg.VectorFloat32 `json:"vector" dgraph:"index=hnsw(metric:cosine)"` |
| **type** | geo | Specifies a geolocation field | Location `json:"location" dgraph:"type=geo"` |
| | datetime | Specifies a datetime field | CreatedAt time.Time `json:"createdAt" dgraph:"type=datetime"` |
| | int | Specifies an integer field | Count int `json:"count" dgraph:"type=int"` |
| | float | Specifies a floating-point field | Price float64 `json:"price" dgraph:"type=float"` |
| | bool | Specifies a boolean field | Active bool `json:"active" dgraph:"type=bool"` |
| | password | Specifies a password field (stored securely) | Password string `json:"password" dgraph:"type=password"` |
| **count** | | Creates a count index | Visits int `json:"visits" dgraph:"count"` |
| **unique** | | Enforces uniqueness for the field | Email string `json:"email" dgraph:"index=hash unique"` |
| **upsert** | | Allows a field to be used in upsert operations | UserID string `json:"userID" dgraph:"index=hash upsert"` |
| **reverse** | | Creates a bidirectional edge | Friends []\*Person `json:"friends" dgraph:"reverse"` |
| **lang** | | Enables multi-language support for the field | Description string `json:"description" dgraph:"lang"` |

### Relationships

Relationships between nodes are defined using struct pointers or slices of struct pointers:

```go
type Person struct {
Name string `json:"name,omitempty" dgraph:"index=exact"`
Name string `json:"name,omitempty" dgraph:"index=exact upsert"`
Friends []*Person `json:"friends,omitempty"`
Manager *Person `json:"manager,omitempty"`

Expand Down Expand Up @@ -268,6 +268,10 @@ Advanced querying is required to properly bind reverse edges in query results. S

modusGraph provides a simple API for common database operations.

Note that in local-mode, unique fields are limited to the top-level object. Fields marked as unique
in embedded or lists of embedded objects that have `unique` tags will not be checked for uniqueness
when the top-level object is inserted.

### Inserting Data

To insert a new node into the database:
Expand All @@ -292,9 +296,40 @@ if err != nil {
fmt.Println("Created user with UID:", user.UID)
```

### Upserting Data

modusGraph provides a simple API for upserting data into the database.

Note that in local-mode, upserts are only supported on the top-level object. Fields in embedded or
lists of embedded objects that have `upsert` tags will be ignored when the top-level object is
upserted.

```go
ctx := context.Background()

user := User{
Name: "John Doe", // this field has the `upsert` tag
Email: "john@example.com",
Role: "Admin",
}

// Upsert the user into the database
// If "John Doe" does not exist, it will be created
// If "John Doe" exists, it will be updated
err := client.Upsert(ctx, &user)
if err != nil {
log.Fatalf("Failed to upsert user: %v", err)
}

```

### Updating Data

To update an existing node, first retrieve it, modify it, then save it back:
To update an existing node, first retrieve it, modify it, then save it back.

Note that in local-mode, unique update checks are only supported on the top-level object. Fields in
embedded or lists of embedded objects that have `unique` tags will not be checked for uniqueness
when the top-level object is updated.

```go
ctx := context.Background()
Expand Down Expand Up @@ -534,10 +569,14 @@ These operations are useful for testing or when you need to reset your database
modusGraph has a few limitations to be aware of:

- **Unique constraints in file-based mode**: Due to the intricacies of how Dgraph handles unique
fields and upserts in its core package, unique field checks and upsert operations are not
supported (yet) when using the local (file-based) mode. These operations work properly when using
a full Dgraph cluster, but the simplified file-based mode does not support the constraint
enforcement mechanisms required for uniqueness guarantees.
fields in its core package, unique field checks are not supported (yet) when using the local
(file-based) mode. These operations work properly when using a full Dgraph cluster, but the
simplified file-based mode does not support the constraint enforcement mechanisms required for
uniqueness guarantees.

- **Upsert operations**: Upsert operations are only supported on the top-level object. Fields in
embedded or lists of embedded objects that have upsert tags will be ignored when the top-level
object is upserted.

- **Schema evolution**: While modusGraph supports schema inference through tags, evolving an
existing schema with new fields requires careful consideration to avoid data inconsistencies.
Expand Down
2 changes: 2 additions & 0 deletions buf_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (s *serverWrapper) Query(ctx context.Context, req *api.Request) (*api.Respo
s.engine.logger.V(2).Info("Query using namespace", "namespaceID", ns.ID())

if len(req.Mutations) > 0 {
s.engine.logger.V(3).Info("Mutating", "mutations", req.Mutations)

uids, err := ns.Mutate(ctx, req.Mutations)
if err != nil {
return nil, fmt.Errorf("engine mutation error: %w", err)
Expand Down
125 changes: 56 additions & 69 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Client interface {

// Upsert inserts an object if it doesn't exist or updates it if it does.
// This operation requires a field with a unique directive in the dgraph tag.
// Note: This operation is not supported in file-based (local) mode.
Upsert(context.Context, any) error
// If no predicates are specified, the first predicate with the `upsert` tag will be used.
// If none are specified in the predicates argument, the first predicate with the `upsert` tag
// will be used.
Upsert(context.Context, any, ...string) error

// Update modifies an existing object in the database.
// The object must be a pointer to a struct and must have a UID field set.
Expand Down Expand Up @@ -86,13 +88,15 @@ var clientMap = make(map[string]Client)
//
// autoSchema: whether to automatically manage the schema.
// poolSize: the size of the dgo client connection pool.
// maxEdgeTraversal: the maximum number of edges to traverse when querying.
// namespace: the namespace for the client.
// logger: the logger for the client.
type clientOptions struct {
autoSchema bool
poolSize int
namespace string
logger logr.Logger
autoSchema bool
poolSize int
maxEdgeTraversal int
namespace string
logger logr.Logger
}

// ClientOpt is a function that configures a client
Expand Down Expand Up @@ -126,6 +130,13 @@ func WithLogger(logger logr.Logger) ClientOpt {
}
}

// WithMaxEdgeTraversal sets the maximum number of edges to traverse when fetching an object
func WithMaxEdgeTraversal(max int) ClientOpt {
return func(o *clientOptions) {
o.maxEdgeTraversal = max
}
}

// NewClient creates a new graph database client instance based on the provided URI.
//
// The function supports two URI schemes:
Expand All @@ -135,6 +146,7 @@ func WithLogger(logger logr.Logger) ClientOpt {
// Optional configuration can be provided via the opts parameter:
// - WithAutoSchema(bool) - Enable/disable automatic schema creation for inserted objects
// - WithPoolSize(int) - Set the connection pool size for better performance under load
// - WithMaxEdgeTraversal(int) - Set the maximum number of edges to traverse when fetching an object
// - WithNamespace(string) - Set the database namespace for multi-tenant installations
// - WithLogger(logr.Logger) - Configure structured logging with custom verbosity levels
//
Expand All @@ -147,10 +159,11 @@ func WithLogger(logger logr.Logger) ClientOpt {
func NewClient(uri string, opts ...ClientOpt) (Client, error) {
// Default options
options := clientOptions{
autoSchema: false,
poolSize: 10,
namespace: "",
logger: logr.Discard(), // No-op logger by default
autoSchema: false,
poolSize: 10,
namespace: "",
maxEdgeTraversal: 10,
logger: logr.Discard(), // No-op logger by default
}

// Apply provided options
Expand Down Expand Up @@ -214,6 +227,10 @@ type client struct {
logger logr.Logger
}

func (c client) isLocal() bool {
return strings.HasPrefix(c.uri, FileURIPrefix)
}

func checkPointer(obj any) error {
if reflect.TypeOf(obj).Kind() != reflect.Ptr {
return errors.New("object must be a pointer")
Expand All @@ -223,79 +240,48 @@ func checkPointer(obj any) error {

// Insert implements inserting an object or slice of objects in the database.
func (c client) Insert(ctx context.Context, obj any) error {
if c.isLocal() {
return c.mutate(ctx, obj, true)
}
return c.process(ctx, obj, "Insert", func(tx *dg.TxnContext, obj any) ([]string, error) {
return tx.MutateBasic(obj)
})
}

// Upsert implements inserting or updating an object or slice of objects in the database.
// Note for local file clients, this is not currently implemented.
func (c client) Upsert(ctx context.Context, obj any) error {
if c.engine != nil {
return errors.New("not implemented")
// Note that the struct tag `upsert` must be used. One or more predicates can be specified
// to be used for upserting. If none are specified, the first predicate with the `upsert` tag
// will be used.
// Note for local file clients, only the first struct field marked with `upsert` will be used
// if none are specified in the predicates argument.
func (c client) Upsert(ctx context.Context, obj any, predicates ...string) error {
if c.isLocal() {
var upsertPredicate string
if len(predicates) > 0 {
upsertPredicate = predicates[0]
if len(predicates) > 1 {
c.logger.V(1).Info("Multiple upsert predicates specified, local mode only supports one, using first of this list",
"predicates", predicates)
}
}
return c.upsert(ctx, obj, upsertPredicate)
}
return c.process(ctx, obj, "Upsert", func(tx *dg.TxnContext, obj any) ([]string, error) {
return tx.Upsert(obj)
return tx.Upsert(obj, predicates...)
})
}

// Update implements updating an existing object in the database.
func (c client) Update(ctx context.Context, obj any) error {
if c.isLocal() {
return c.mutate(ctx, obj, false)
}

return c.process(ctx, obj, "Update", func(tx *dg.TxnContext, obj any) ([]string, error) {
return tx.MutateBasic(obj)
})
}

func (c client) process(ctx context.Context,
obj any, operation string,
txFunc func(*dg.TxnContext, any) ([]string, error)) error {

objType := reflect.TypeOf(obj)
objKind := objType.Kind()
var schemaObj any

if objKind == reflect.Slice {
sliceValue := reflect.Indirect(reflect.ValueOf(obj))
if sliceValue.Len() == 0 {
err := errors.New("slice cannot be empty")
return err
}
schemaObj = sliceValue.Index(0).Interface()
} else {
schemaObj = obj
}

err := checkPointer(schemaObj)
if err != nil {
if objKind == reflect.Slice {
err = errors.Join(err, errors.New("slice elements must be pointers"))
}
return err
}

if c.options.autoSchema {
err := c.UpdateSchema(ctx, schemaObj)
if err != nil {
return err
}
}

client, err := c.pool.get()
if err != nil {
c.logger.Error(err, "Failed to get client from pool")
return err
}
defer c.pool.put(client)

tx := dg.NewTxnContext(ctx, client).SetCommitNow()
uids, err := txFunc(tx, obj)
if err != nil {
return err
}
c.logger.V(2).Info(operation+" successful", "uidCount", len(uids))
return nil
}

// Delete implements removing objects with the specified UIDs.
func (c client) Delete(ctx context.Context, uids []string) error {
client, err := c.pool.get()
Expand All @@ -322,10 +308,11 @@ func (c client) Get(ctx context.Context, obj any, uid string) error {
defer c.pool.put(client)

txn := dg.NewReadOnlyTxnContext(ctx, client)
return txn.Get(obj).UID(uid).Node()
return txn.Get(obj).UID(uid).All(c.options.maxEdgeTraversal).Node()
}

// Query implements querying similar to dgman's TxnContext.Get method.
// Returns a *dg.Query that can be further refined with filters, pagination, etc.
// The returned query will be limited to the maximum number of edges specified in the options.
func (c client) Query(ctx context.Context, model any) *dg.Query {
client, err := c.pool.get()
if err != nil {
Expand All @@ -334,7 +321,7 @@ func (c client) Query(ctx context.Context, model any) *dg.Query {
defer c.pool.put(client)

txn := dg.NewReadOnlyTxnContext(ctx, client)
return txn.Get(model)
return txn.Get(model).All(c.options.maxEdgeTraversal)
}

// UpdateSchema implements updating the Dgraph schema. Pass one or more
Expand Down Expand Up @@ -389,7 +376,7 @@ func (c client) DropData(ctx context.Context) error {

// QueryRaw implements raw querying (DQL syntax) and optional variables.
func (c client) QueryRaw(ctx context.Context, q string, vars map[string]string) ([]byte, error) {
if c.engine != nil {
if c.isLocal() {
ns := c.engine.GetDefaultNamespace()
resp, err := ns.QueryWithVars(ctx, q, vars)
if err != nil {
Expand Down
Loading