Skip to content

SafeConcurrency is a Go library designed to simplify the management of concurrent tasks, providing a safe and structured way to produce and consume results.

License

Notifications You must be signed in to change notification settings

Izzette/go-safeconcurrency

Repository files navigation

SafeConcurrency for Go

SafeConcurrency Logo

SafeConcurrency is a Go library designed to simplify the management of concurrent tasks, providing a safe and structured way to produce and consume results. It enforces best practices for context propagation, error handling, and resource cleanup.

⚠️ Warning: This library is in early development and may not be suitable for production use yet.

The API will change frequently as we refine the design and functionality. Expect new features and improvements in future releases, generators and work pools are just the beginning.

Go Version Go Reference Go Report Card License

Features

  • Generator Pattern: Safely produce values from concurrent operations via channel-based results
  • Context Integration: Built-in support for context cancellation and deadlines
  • Error Handling: Gracefully handle errors from concurrent operations
  • Concurrency-Safe: All APIs are designed for concurrent use from different goroutines
    • Very few mutexes are used, instead synchronizing using channels and atomic operations
  • Flexible Buffering: Configurable request and result channel buffering for different throughput, synchronization, and back-pressure needs
  • Worker Pools: Operate in a pool of workers to manage shared resources across heterogeneous tasks called from different goroutines (perfect for API clients or database connections)
  • Event Loops: Support for event loops for handling events in a sequential manner
    • Atomic state snapshots with generation tracking
    • Event hooks for monitoring and customization

Planned Features

  • Parallel Mapping: Support for parallel mapping of input values to output results
  • Pipeline Support: Create pipelines of generators for complex workflows

Usage

Key Components

  1. Generators Implement your concurrent logic by creating a type that satisfies types.Producer:

    type MyProducer struct{}
    
    func (r *MyProducer) Run(ctx context.Context, h types.Emitter[Output]) error {
        // Your concurrent logic here
        h.Emit(ctx, value)
        return fatalErr
    }

    Create and manage concurrent execution:

    gen := generator.New[Output](&MyProducer{})
    gen.Start(ctx)

    Consume results safely from the channel:

    for val := range gen.Results() {
        // Handle value
    }
  2. Worker Pools Implement your task logic by creating a type that satisfies types.Task:

    type Task struct{}
    
    func (t *Task) Execute(ctx context.Context, resource ResourceType) (Output, error) {
        // Your task logic here
        return result, nil
    }

    Create and manage worker pools:

    mypool := workpool.New[ResourceType](resource, concurrency)
    mypool.Start()
    defer mypool.Close()

    Submit tasks to the worker pool and receive results:

    // Submit tasks
    val, err := workpool.Submit[ResourceType, Output](ctx, mypool, &Task{})
    // Handle result
  3. Event Loops Implement your event logic by creating a type that satisfies types.Event:

    type RequestEvent struct {}
    
    func (e *RequestEvent) Dispatch(gen types.GenerationID, s *AppState) *AppState {
        fmt.Printf("Processing request #%d\n", gen)
        s.Requests++
        return s
    }

    Create a state type to hold your application state:

    type AppState struct { Requests int }
    
    func (s *AppState) Copy() *AppState {
        return snapshot.CopyPtr(s)
    }

    Create and manage the event loop:

    snap := snapshot.NewCopyable(&AppState{})
    el := eventloop.New[*AppState](snap)
    defer el.Close()
    el.Start()

    Send events to the loop:

    gen, err := el.Send(ctx, &RequestEvent{})
    if err != nil {
        panic(err)
    }

    Wait for the event to be processed and get a snapshot of the state:

    snap, err = eventloop.WaitForGeneration(ctx, el, gen)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Current requests: %d\n", snap.State().Requests)

See the examples directory for more detailed examples, or interact with them in the browser on pkg.go.dev.

Documentation

Full API documentation is available on GoDoc.

  • For types and interfaces, see api/types.
  • For creating generators, see generator.
  • For creating worker pools and tasks, see workpool.
  • For creating event loops, see eventloop.
  • Examples can be interacted with in the browser at examples.

Contributing

We welcome contributions! Please follow these guidelines:

  1. Install pre-requisites:

  2. Set up development environment:

    # Install python virtual environment for pre-commit hooks
    pre-commit install
  3. Ensure all tests pass:

    # Run all tests and linters
    make all
  4. Add tests for new features, make sure to check the test coverage:

    # Run tests with coverage
    make test-unit

    Use go tool cover -html tmp/coverage/cover.out to view the coverage report in your browser.

  5. Update documentation accordingly. Use Godoc comments for public types and functions: https://go.dev/blog/godoc

  6. Use Conventional Commits for commit titles. This is required for our automated release process, Release Please.

  7. Open a pull request with a clear description of the changes and why they are needed. Include the CHANGELOG entry you would like to see in the release, it doesn't need to be perfect: we can refine it together.

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

SafeConcurrency is a Go library designed to simplify the management of concurrent tasks, providing a safe and structured way to produce and consume results.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •