A Go module that provides various input handlers for different protocols and message systems.
- HTTP Input: Accept messages via HTTP requests with configurable paths, methods, and TLS support
- TCP Input: Handle TCP connections with concurrent processing and keep-alive settings
- Kafka Input: Consume messages from Kafka topics with consumer group support
- NSQ Input: Connect to NSQ/NSQLookupd for distributed message processing
- Flexible Logging: Built-in logging interface with customizable implementations
- Zero External Dependencies: Only uses Go standard library for core functionality
go get github.com/marsgopher/inputs
package main
import (
"fmt"
"github.com/marsgopher/inputs/http"
"github.com/marsgopher/inputs/log"
)
func main() {
cfg := http.Config{
Bind: ":8080",
RequestPath: "/webhook",
RequestMethod: "POST",
}
handler, err := http.New(cfg, http.WithLogLevel(log.LogLevelInfo))
if err != nil {
panic(err)
}
go func() {
for msg := range handler.Read() {
fmt.Printf("Received: %s\n", string(msg))
}
}()
if err := handler.Run(); err != nil {
panic(err)
}
}
package main
import (
"fmt"
"github.com/marsgopher/inputs/kafka"
"github.com/marsgopher/inputs/log"
)
func main() {
cfg := kafka.Config{
Addresses: []string{"localhost:9092"},
Topics: []string{"my-topic"},
GroupID: "my-consumer-group",
}
handler, err := kafka.New(cfg, kafka.WithLogLevel(log.LogLevelInfo))
if err != nil {
panic(err)
}
go func() {
for msg := range handler.Read() {
fmt.Printf("Received: %s\n", string(msg))
}
}()
if err := handler.Run(); err != nil {
panic(err)
}
}
The module provides a flexible logging interface:
import "github.com/marsgopher/inputs/log"
// Use default logger with different levels
handler, _ := http.New(cfg, http.WithLogLevel(log.LogLevelInfo))
// Use silent logger (no output)
handler, _ := http.New(cfg, http.WithSilentLogger())
// Use custom logger
type MyLogger struct{}
func (l *MyLogger) Infof(format string, args ...interface{}) { /* implementation */ }
func (l *MyLogger) Errorf(format string, args ...interface{}) { /* implementation */ }
handler, _ := http.New(cfg, http.WithLogger(&MyLogger{}))
Each input type has its own configuration struct with relevant options:
- HTTP: Bind address, request path/method, timeouts, TLS certificates
- TCP: Bind address, keep-alive settings, buffer sizes, worker count
- Kafka: Broker addresses, topics, consumer group, offset settings
- NSQ: NSQd/NSQLookupd addresses, topic, channel, timeout settings
See the individual package documentation for complete configuration options.
go test ./...
Contributions are welcome! Please feel free to submit issues and pull requests.
This project is licensed under the MIT License - see the LICENSE file for details.