Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
486 changes: 454 additions & 32 deletions distsys/archetypeinterface.go

Large diffs are not rendered by default.

58 changes: 56 additions & 2 deletions distsys/archetyperesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/gob"
"errors"

"github.com/UBC-NSS/pgo/distsys/tla"
)

Expand Down Expand Up @@ -49,6 +48,17 @@ type ArchetypeResource interface {
// archetype is not running. Close will be called at most once by the MPCal
// Context.
Close() error
// ForkState must clone all sub-resources whose properties are NOT idempotent. ForkState returns
// a copy of this current resource along with any other properties required so that it can be run independently.
// For now this is a BLOCKING CALL (though perhaps can be converted to NON-BLOCKING by returning a channel)
// This is meant to be called before doing concurrent operations with the same resource for critical sections
ForkState() (ArchetypeResource, error)
// LinkState syncs the data from the caller resource into the forkParent resource. It is intended to be called on a
// resource who was forked by a call to ForkState.
LinkState() error
// AbortState must revert all changes make by sub-resources whose properties are NOT idempotent. It is intended to
// be called on a resource woh was forked by a call to ForkState
AbortState() error
}

type ArchetypeResourceLeafMixin struct{}
Expand Down Expand Up @@ -79,6 +89,7 @@ type LocalArchetypeResource struct {
// if this resource is already written in this critical section, oldValue contains prev value
// value always contains the "current" value
value, oldValue tla.TLAValue
forkParent *LocalArchetypeResource
}

var _ ArchetypeResource = &LocalArchetypeResource{}
Expand Down Expand Up @@ -137,6 +148,27 @@ func (res *LocalArchetypeResource) Close() error {
return nil
}

func (res *LocalArchetypeResource) ForkState() (ArchetypeResource, error) {
return &LocalArchetypeResource{
value: res.value,
oldValue: res.oldValue,
hasOldValue: res.hasOldValue,
forkParent: res,
}, nil
}

func (res *LocalArchetypeResource) LinkState() error {
res.forkParent.value = res.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following on for why this method should not exist imo, notice how there would be literally no difference if you just kept one of the forked resources, vs doing a field by field full copy here.

res.forkParent.oldValue = res.oldValue
res.forkParent.hasOldValue = res.hasOldValue

return nil
}

func (res *LocalArchetypeResource) AbortState() error {
return nil
}

func (res *LocalArchetypeResource) GetState() ([]byte, error) {
var writer bytes.Buffer
encoder := gob.NewEncoder(&writer)
Expand All @@ -152,7 +184,8 @@ type localArchetypeSubResource struct {
// indices gives the total path from root value, as accumulated from calls to Index, e.g with `i[a][b] := ...` you get []{a, b}
indices []tla.TLAValue
// the parent local resource. it does everything important, which is why most methods here just return nil; they shouldn't even be called
parent *LocalArchetypeResource
parent *LocalArchetypeResource
forkParent *localArchetypeSubResource
}

var _ ArchetypeResource = &localArchetypeSubResource{}
Expand Down Expand Up @@ -207,3 +240,24 @@ func (res localArchetypeSubResource) Index(index tla.TLAValue) (ArchetypeResourc
func (res localArchetypeSubResource) Close() error {
return nil
}

func (res localArchetypeSubResource) ForkState() (ArchetypeResource, error) {
return localArchetypeSubResource{
indices: res.indices,
parent: res.parent,
forkParent: &res,
}, nil
}

func (res localArchetypeSubResource) LinkState() error {

res.forkParent.indices = res.indices
res.forkParent.parent = res.parent

return nil
}

func (res localArchetypeSubResource) AbortState() error {
//TODO implement me
panic("implement me")
}
92 changes: 65 additions & 27 deletions distsys/mpcalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func MakeMPCalJumpTable(criticalSections ...MPCalCriticalSection) MPCalJumpTable

// MPCalCriticalSection holds metadata for a single MPCal critical section
type MPCalCriticalSection struct {
Name string // the critical section's full name (in the form ArchetypeOrProcedureName.LabelName)
Body func(iface ArchetypeInterface) error // code for executing this critical section. should be straight-line code that runs in a bounded amount of time.
Name string // the critical section's full name (in the form ArchetypeOrProcedureName.LabelName)
Body func(iface *ArchetypeInterface) error // code for executing this critical section. should be straight-line code that runs in a bounded amount of time.
}

// MPCalProcTable is an immutable table of all procedures a given collection of archetypes and procedures might call
Expand All @@ -57,20 +57,20 @@ func MakeMPCalProcTable(procs ...MPCalProc) MPCalProcTable {

// MPCalProc holds all metadata necessary for calling an MPCal procedure
type MPCalProc struct {
Name string // the procedure's name, as given in the MPCal model
Label string // the fully qualified name of the procedure's first label
StateVars []string // the fuly-qualified names of all the procedure's local state variables, including arguments and refs
PreAmble func(iface ArchetypeInterface) error // code to initialize local state variables, writing any initial values they might have. runs as part of a call to the procedure.
Name string // the procedure's name, as given in the MPCal model
Label string // the fully qualified name of the procedure's first label
StateVars []string // the fuly-qualified names of all the procedure's local state variables, including arguments and refs
PreAmble func(iface *ArchetypeInterface) error // code to initialize local state variables, writing any initial values they might have. runs as part of a call to the procedure.
}

// MPCalArchetype holds all the metadata necessary to run an archetype, aside from user-provided configuration
type MPCalArchetype struct {
Name string // the archetype's name, as it reads in the MPCal source code
Label string // the full label name of the first critical section this archetype should execute
RequiredRefParams, RequiredValParams []string // names of ref and non-ref parameters
JumpTable MPCalJumpTable // a cross-reference to a jump table containing this archetype's critical sections
ProcTable MPCalProcTable // a cross-reference to a table of all MPCal procedures this archetype might call
PreAmble func(iface ArchetypeInterface) // called on archetype start-up, this code should initialize any local variables the archetype has
Name string // the archetype's name, as it reads in the MPCal source code
Label string // the full label name of the first critical section this archetype should execute
RequiredRefParams, RequiredValParams []string // names of ref and non-ref parameters
JumpTable MPCalJumpTable // a cross-reference to a jump table containing this archetype's critical sections
ProcTable MPCalProcTable // a cross-reference to a table of all MPCal procedures this archetype might call
PreAmble func(iface *ArchetypeInterface) // called on archetype start-up, this code should initialize any local variables the archetype has
}

// ArchetypeResourceHandle encapsulates a reference to an ArchetypeResource.
Expand Down Expand Up @@ -109,6 +109,16 @@ type ArchetypeResourceMakerStruct struct {
ConfigureFn func(res ArchetypeResource)
}

//type ForkedResourceNode struct {
// parent *ForkedResourceNode
// ForkedResources map[ArchetypeResourceHandle]ArchetypeResource
// path string
//}

//type ForkedResourceTree struct {
// root *ForkedResourceNode
//}

var _ ArchetypeResourceMaker = ArchetypeResourceMakerStruct{}

func (mkStruct ArchetypeResourceMakerStruct) Make() ArchetypeResource {
Expand All @@ -132,14 +142,17 @@ type MPCalContext struct {

// state for ArchetypeInterface.NextFairnessCounter
fairnessCounter FairnessCounter
// Forked resource tree
//forkedResourceTree ForkedResourceTree
//branchScheduler BranchScheduler

jumpTable MPCalJumpTable
procTable MPCalProcTable

dirtyResourceHandles map[ArchetypeResourceHandle]bool

// iface points right back to this *MPCalContext; used to separate external and internal APIs
iface ArchetypeInterface
iface *ArchetypeInterface

constantDefns map[string]func(args ...tla.TLAValue) tla.TLAValue

Expand Down Expand Up @@ -179,6 +192,7 @@ func NewMPCalContext(self tla.TLAValue, archetype MPCalArchetype, configFns ...M
self: self,
resources: make(map[ArchetypeResourceHandle]ArchetypeResource),
fairnessCounter: RoundRobinFairnessCounterMaker()(),
//branchScheduler: BranchSchedulerMaker(),

jumpTable: archetype.JumpTable,
procTable: archetype.ProcTable,
Expand All @@ -193,7 +207,15 @@ func NewMPCalContext(self tla.TLAValue, archetype MPCalArchetype, configFns ...M

awaitExit: make(chan struct{}),
}
ctx.iface = ArchetypeInterface{ctx: ctx}
ctx.iface = &ArchetypeInterface{
ctx: ctx,
ForkedResources: ctx.resources,
parent: nil,
path: "0",
}

//root := ForkedResourceNode{ForkedResources: ctx.resources, path: "0"}
//ctx.forkedResourceTree = ForkedResourceTree{root: &root}

ctx.ensureArchetypeResource(".pc", LocalArchetypeResourceMaker(tla.MakeTLAString(archetype.Label)))
ctx.ensureArchetypeResource(".stack", LocalArchetypeResourceMaker(tla.MakeTLATuple()))
Expand Down Expand Up @@ -256,7 +278,7 @@ func EnsureArchetypeDerivedRefParam(name string, parentName string, dMaker Deriv
if err != nil {
panic(fmt.Errorf("error in finding archetype derived ref param parent: %s", err))
}
parentRes := ctx.getResourceByHandle(parentHandle)
parentRes := ctx.iface.getResourceByHandle(parentHandle)
maker := dMaker(parentRes)
EnsureArchetypeRefParam(name, maker)(ctx)
}
Expand Down Expand Up @@ -390,7 +412,7 @@ func NewMPCalContextWithoutArchetype(configFns ...MPCalContextConfigFn) *MPCalCo
ctx := &MPCalContext{
constantDefns: make(map[string]func(args ...tla.TLAValue) tla.TLAValue),
}
ctx.iface = ArchetypeInterface{ctx}
ctx.iface = &ArchetypeInterface{ctx: ctx}

for _, configFn := range configFns {
configFn(ctx)
Expand All @@ -402,7 +424,7 @@ func NewMPCalContextWithoutArchetype(configFns ...MPCalContextConfigFn) *MPCalCo
// IFace provides an ArchetypeInterface, giving access to methods considered MPCal-internal.
// This is useful when directly calling pure TLA+ operators using a context constructed via NewMPCalContextWithoutArchetype,
// and is one of very few operations that will work on such a context.
func (ctx *MPCalContext) IFace() ArchetypeInterface {
func (ctx *MPCalContext) IFace() *ArchetypeInterface {
return ctx.iface
}

Expand All @@ -419,18 +441,31 @@ func (ctx *MPCalContext) ensureArchetypeResource(name string, maker ArchetypeRes
return handle
}

func (ctx *MPCalContext) getResourceByHandle(handle ArchetypeResourceHandle) ArchetypeResource {
res, ok := ctx.resources[handle]
if !ok {
panic(fmt.Errorf("could not find resource with name %v", handle))
}
return res
}
//func (ctx *MPCalContext) getResourceByHandle(handle ArchetypeResourceHandle) ArchetypeResource {
// //node := ctx.forkedResourceTree.root
// //for {
// // if node == nil {
// // panic(fmt.Errorf("could not find resource with name %v", handle))
// // }
// //
// // res, ok := node.ForkedResources[handle]
// // if ok {
// // return res
// // }
// // node = node.parent
// //}
//
// //res, ok := ctx.resources[handle]
// //if !ok {
// // panic(fmt.Errorf("could not find resource with name %v", handle))
// //}
// //return res
//}

func (ctx *MPCalContext) abort() {
var nonTrivialAborts []chan struct{}
for resHandle := range ctx.dirtyResourceHandles {
ch := ctx.getResourceByHandle(resHandle).Abort()
ch := ctx.iface.getResourceByHandle(resHandle).Abort()
if ch != nil {
nonTrivialAborts = append(nonTrivialAborts, ch)
}
Expand All @@ -449,7 +484,7 @@ func (ctx *MPCalContext) commit() (err error) {
// dispatch all parts of the pre-commit phase asynchronously, so we only wait as long as the slowest resource
var nonTrivialPreCommits []chan error
for resHandle := range ctx.dirtyResourceHandles {
ch := ctx.getResourceByHandle(resHandle).PreCommit()
ch := ctx.iface.getResourceByHandle(resHandle).PreCommit()
if ch != nil {
nonTrivialPreCommits = append(nonTrivialPreCommits, ch)
}
Expand All @@ -469,7 +504,7 @@ func (ctx *MPCalContext) commit() (err error) {
// same as above, run all the commit processes async
var nonTrivialCommits []chan struct{}
for resHandle := range ctx.dirtyResourceHandles {
ch := ctx.getResourceByHandle(resHandle).Commit()
ch := ctx.iface.getResourceByHandle(resHandle).Commit()
if ch != nil {
nonTrivialCommits = append(nonTrivialCommits, ch)
}
Expand Down Expand Up @@ -597,7 +632,10 @@ func (ctx *MPCalContext) Run() (err error) {
pcValStr := pcVal.AsString()

ctx.fairnessCounter.BeginCriticalSection(pcValStr)
//ctx.branchScheduler.BeginCriticalSection(pcValStr)

criticalSection := ctx.iface.getCriticalSection(pcValStr)
//fmt.Println(criticalSection)
err = criticalSection.Body(ctx.iface)
if err != nil {
continue
Expand Down
45 changes: 45 additions & 0 deletions distsys/resources/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ func (res *InputChannel) Close() error {
return nil
}

func (res *InputChannel) ForkState() (distsys.ArchetypeResource, error) {
//TODO implement me
panic("implement me")
}

func (res *InputChannel) LinkState() error {
//TODO implement me
panic("implement me")
}

func (res *InputChannel) AbortState() error {
//TODO implement me
panic("implement me")
}

// OutputChannel wraps a native Go channel, such that an MPCal model may write to that channel.
type OutputChannel struct {
distsys.ArchetypeResourceLeafMixin
Expand Down Expand Up @@ -128,6 +143,21 @@ func (res *OutputChannel) Close() error {
return nil
}

func (res *OutputChannel) ForkState() (distsys.ArchetypeResource, error) {
//TODO implement me
panic("implement me")
}

func (res *OutputChannel) LinkState() error {
//TODO implement me
panic("implement me")
}

func (res *OutputChannel) AbortState() error {
//TODO implement me
panic("implement me")
}

const singleOutputChannelWriteTimeout = 20 * time.Millisecond

type SingleOutputChannel struct {
Expand Down Expand Up @@ -173,3 +203,18 @@ func (res *SingleOutputChannel) WriteValue(value tla.TLAValue) error {
func (res *SingleOutputChannel) Close() error {
return nil
}

func (res *SingleOutputChannel) ForkState() (distsys.ArchetypeResource, error) {
//TODO implement me
panic("implement me")
}

func (res *SingleOutputChannel) LinkState() error {
//TODO implement me
panic("implement me")
}

func (res *SingleOutputChannel) AbortState() error {
//TODO implement me
panic("implement me")
}
15 changes: 15 additions & 0 deletions distsys/resources/crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ func (res *crdt) Close() error {
return nil
}

func (res *crdt) ForkState() (distsys.ArchetypeResource, error) {
//TODO implement me
panic("implement me")
}

func (res *crdt) LinkState() error {
//TODO implement me
panic("implement me")
}

func (res *crdt) AbortState() error {
//TODO implement me
panic("implement me")
}

// tryConnectPeers tries to connect to peer nodes with timeout. If dialing
// succeeds, retains the client for later RPC.
func (res *crdt) tryConnectPeers(selected *immutable.Map) {
Expand Down
15 changes: 15 additions & 0 deletions distsys/resources/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,18 @@ func (res *Dummy) Index(index tla.TLAValue) (distsys.ArchetypeResource, error) {
func (res *Dummy) Close() error {
return nil
}

func (res *Dummy) ForkState() (distsys.ArchetypeResource, error) {
//TODO implement me
panic("implement me")
}

func (res *Dummy) LinkState() error {
//TODO implement me
panic("implement me")
}

func (res *Dummy) AbortState() error {
//TODO implement me
panic("implement me")
}
Loading