SafeConcurrency is a Go library designed to simplify the management of concurrent tasks, providing a safe and structured way to produce and consume results. It enforces best practices for context propagation, error handling, and resource cleanup.
The API will change frequently as we refine the design and functionality. Expect new features and improvements in future releases, generators and work pools are just the beginning.
- Generator Pattern: Safely produce values from concurrent operations via channel-based results
- Adheres to Go best-practice: “Do not communicate by sharing memory; instead, share memory by communicating”
- Context Integration: Built-in support for context cancellation and deadlines
- Error Handling: Gracefully handle errors from concurrent operations
- Concurrency-Safe: All APIs are designed for concurrent use from different goroutines
- Very few mutexes are used, instead synchronizing using channels and atomic operations
- Flexible Buffering: Configurable request and result channel buffering for different throughput, synchronization, and back-pressure needs
- Worker Pools: Operate in a pool of workers to manage shared resources across heterogeneous tasks called from different goroutines (perfect for API clients or database connections)
- Event Loops: Support for event loops for handling events in a sequential manner
- Atomic state snapshots with generation tracking
- Event hooks for monitoring and customization
- Parallel Mapping: Support for parallel mapping of input values to output results
- Pipeline Support: Create pipelines of generators for complex workflows
-
Generators Implement your concurrent logic by creating a type that satisfies
types.Producer
:type MyProducer struct{} func (r *MyProducer) Run(ctx context.Context, h types.Emitter[Output]) error { // Your concurrent logic here h.Emit(ctx, value) return fatalErr }
Create and manage concurrent execution:
gen := generator.New[Output](&MyProducer{}) gen.Start(ctx)
Consume results safely from the channel:
for val := range gen.Results() { // Handle value }
-
Worker Pools Implement your task logic by creating a type that satisfies
types.Task
:type Task struct{} func (t *Task) Execute(ctx context.Context, resource ResourceType) (Output, error) { // Your task logic here return result, nil }
Create and manage worker pools:
mypool := workpool.New[ResourceType](resource, concurrency) mypool.Start() defer mypool.Close()
Submit tasks to the worker pool and receive results:
// Submit tasks val, err := workpool.Submit[ResourceType, Output](ctx, mypool, &Task{}) // Handle result
-
Event Loops Implement your event logic by creating a type that satisfies
types.Event
:type RequestEvent struct {} func (e *RequestEvent) Dispatch(gen types.GenerationID, s *AppState) *AppState { fmt.Printf("Processing request #%d\n", gen) s.Requests++ return s }
Create a state type to hold your application state:
type AppState struct { Requests int } func (s *AppState) Copy() *AppState { return snapshot.CopyPtr(s) }
Create and manage the event loop:
snap := snapshot.NewCopyable(&AppState{}) el := eventloop.New[*AppState](snap) defer el.Close() el.Start()
Send events to the loop:
gen, err := el.Send(ctx, &RequestEvent{}) if err != nil { panic(err) }
Wait for the event to be processed and get a snapshot of the state:
snap, err = eventloop.WaitForGeneration(ctx, el, gen) if err != nil { panic(err) } fmt.Printf("Current requests: %d\n", snap.State().Requests)
See the examples directory for more detailed examples, or interact with them in the browser on pkg.go.dev.
Full API documentation is available on GoDoc.
- For types and interfaces, see api/types.
- For creating generators, see generator.
- For creating worker pools and tasks, see workpool.
- For creating event loops, see eventloop.
- Examples can be interacted with in the browser at examples.
We welcome contributions! Please follow these guidelines:
-
Install pre-requisites:
- Go 1.20 or later
- Python 3.9 or later (for pre-commit)
- pre-commit (https://pre-commit.com/)
- Make (GNU Make recommended: https://www.gnu.org/software/make/)
- Golangci-lint (https://golangci-lint.run/welcome/install/#local-installation):
-
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
-
-
Set up development environment:
# Install python virtual environment for pre-commit hooks pre-commit install
-
Ensure all tests pass:
# Run all tests and linters make all
-
Add tests for new features, make sure to check the test coverage:
# Run tests with coverage make test-unit
Use
go tool cover -html tmp/coverage/cover.out
to view the coverage report in your browser. -
Update documentation accordingly. Use Godoc comments for public types and functions: https://go.dev/blog/godoc
-
Use Conventional Commits for commit titles. This is required for our automated release process, Release Please.
-
Open a pull request with a clear description of the changes and why they are needed. Include the CHANGELOG entry you would like to see in the release, it doesn't need to be perfect: we can refine it together.
This project is licensed under the MIT License - see the LICENSE file for details.