Skip to content

Commit a497a12

Browse files
authored
Merge pull request #20 from reugn/develop
v0.3.0
2 parents 47f8f85 + cc4aa6b commit a497a12

File tree

12 files changed

+457
-1
lines changed

12 files changed

+457
-1
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## Motivation
2+
* Fixes #___
3+
*
4+
## Modifications
5+
*
6+
## Verify change
7+
* [ ] Make sure the change passes the CI checks.

CONTRIBUTING.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# How to contribute
2+
3+
If you would like to contribute code to this project, fork the repository and send a pull request.
4+
5+
## Prerequisite
6+
7+
If you have not installed Go, install it according to the [installation instruction](http://golang.org/doc/install).
8+
Since the `go mod` package management tool is used in this project, **Go 1.11 or higher** version is required.
9+
10+
## Fork
11+
12+
Before contributing, you need to fork [go-streams](https://github.com/reugn/go-streams) to your github repository.
13+
14+
## Contribution flow
15+
16+
* [Configure remote for a fork](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/configuring-a-remote-for-a-fork)
17+
* git checkout -b <your_branch>
18+
* git add .
19+
* git commit -m "commit message"
20+
* git push --set-upstream origin <your_branch>
21+
* Verify all tests and CI checks pass
22+
* [Create a pull request](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request)
23+
24+
## Code style
25+
26+
The coding style suggested by the Golang community is used in `go-streams`. For details, refer to [style doc](https://github.com/golang/go/wiki/CodeReviewComments).

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ Flow capabilities (flow package):
3131
Supported Sources and Sinks (ext package):
3232
* Go channels
3333
* File system
34-
* [Kafka](https://kafka.apache.org/)
34+
* Network (TCP, UDP)
35+
* [Apache Kafka](https://kafka.apache.org/)
36+
* [Apache Pulsar](https://pulsar.apache.org/)
3537
* [Redis](https://redis.io/)
3638

3739
## Examples

examples/net/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
net

examples/net/main.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/reugn/go-streams"
8+
ext "github.com/reugn/go-streams/extension"
9+
"github.com/reugn/go-streams/flow"
10+
)
11+
12+
// Test producer: nc -u 127.0.0.1 3434
13+
// Test consumer: nc -u -l 3535
14+
func main() {
15+
source, err := ext.NewNetSource(ext.UDP, "127.0.0.1:3434")
16+
streams.Check(err)
17+
flow1 := flow.NewMap(toUpper, 1)
18+
sink, err := ext.NewNetSink(ext.UDP, "127.0.0.1:3535")
19+
streams.Check(err)
20+
21+
source.Via(flow1).To(sink)
22+
}
23+
24+
var toUpper = func(in interface{}) interface{} {
25+
msg := in.(string)
26+
fmt.Printf("Got: %s\n", msg)
27+
return strings.ToUpper(msg)
28+
}

examples/pulsar/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pulsar

examples/pulsar/docker-compose.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
version: '3'
2+
3+
services:
4+
standalone:
5+
image: apachepulsar/pulsar
6+
ports:
7+
- "6650:6650"
8+
expose:
9+
- 8080
10+
- 6650
11+
environment:
12+
- PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
13+
command: >
14+
/bin/bash -c
15+
"bin/apply-config-from-env.py conf/standalone.conf
16+
&& bin/pulsar standalone"
17+
dashboard:
18+
image: apachepulsar/pulsar-dashboard
19+
depends_on:
20+
- standalone
21+
ports:
22+
- "80:80"
23+
environment:
24+
- SERVICE_URL=http://standalone:8080

examples/pulsar/main.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package main
2+
3+
import (
4+
"strings"
5+
6+
"github.com/apache/pulsar-client-go/pulsar"
7+
"github.com/reugn/go-streams"
8+
ext "github.com/reugn/go-streams/extension"
9+
"github.com/reugn/go-streams/flow"
10+
)
11+
12+
func main() {
13+
clientOptions := pulsar.ClientOptions{URL: "pulsar://localhost:6650"}
14+
producerOptions := pulsar.ProducerOptions{Topic: "test2"}
15+
consumerOptions := pulsar.ConsumerOptions{
16+
Topic: "test1",
17+
SubscriptionName: "group1",
18+
Type: pulsar.Exclusive,
19+
}
20+
21+
source, err := ext.NewPulsarSource(&clientOptions, &consumerOptions)
22+
streams.Check(err)
23+
flow1 := flow.NewMap(toUpper, 1)
24+
sink := ext.NewPulsarSink(&clientOptions, &producerOptions)
25+
26+
source.Via(flow1).To(sink)
27+
}
28+
29+
var toUpper = func(in interface{}) interface{} {
30+
msg := in.(pulsar.Message)
31+
return strings.ToUpper(string(msg.Payload()))
32+
}

extension/net.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package ext
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"log"
7+
"net"
8+
"time"
9+
10+
"github.com/reugn/go-streams"
11+
"github.com/reugn/go-streams/flow"
12+
)
13+
14+
// ConnType connection type
15+
type ConnType string
16+
17+
const (
18+
// TCP connection type
19+
TCP ConnType = "tcp"
20+
// UDP connection type
21+
UDP ConnType = "udp"
22+
)
23+
24+
// NetSource network socket connector
25+
type NetSource struct {
26+
conn net.Conn
27+
listener net.Listener
28+
connType ConnType
29+
out chan interface{}
30+
}
31+
32+
// NewNetSource creates a new NetSource
33+
func NewNetSource(connType ConnType, address string) (*NetSource, error) {
34+
var err error
35+
var conn net.Conn
36+
var listener net.Listener
37+
out := make(chan interface{})
38+
39+
switch connType {
40+
case TCP:
41+
addr, _ := net.ResolveTCPAddr(string(connType), address)
42+
listener, err = net.ListenTCP(string(connType), addr)
43+
44+
if err != nil {
45+
log.Fatal(err)
46+
return nil, err
47+
}
48+
49+
go acceptConnections(listener, out)
50+
51+
case UDP:
52+
addr, _ := net.ResolveUDPAddr(string(connType), address)
53+
conn, err = net.ListenUDP(string(connType), addr)
54+
55+
if err != nil {
56+
log.Fatal(err)
57+
return nil, err
58+
}
59+
60+
go handleConnection(conn, out)
61+
62+
default:
63+
return nil, errors.New("Invalid connection type")
64+
}
65+
66+
source := &NetSource{
67+
conn: conn,
68+
listener: listener,
69+
connType: connType,
70+
out: out,
71+
}
72+
73+
return source, nil
74+
}
75+
76+
// TCP Accept routine
77+
func acceptConnections(listener net.Listener, out chan<- interface{}) {
78+
for {
79+
// accept new connection
80+
conn, err := listener.Accept()
81+
if err != nil {
82+
log.Fatal(err)
83+
}
84+
85+
// handle new connection
86+
go handleConnection(conn, out)
87+
}
88+
log.Printf("Closing NetSource TCP listener %v", listener)
89+
listener.Close()
90+
}
91+
92+
// handleConnection routine
93+
func handleConnection(conn net.Conn, out chan<- interface{}) {
94+
log.Printf("NetSource connected on: %v", conn.LocalAddr())
95+
reader := bufio.NewReader(conn)
96+
for {
97+
bufferBytes, err := reader.ReadBytes('\n')
98+
if err != nil {
99+
log.Fatal(err)
100+
break
101+
} else {
102+
out <- string(bufferBytes)
103+
}
104+
}
105+
log.Printf("Closing NetSource connection %v", conn)
106+
conn.Close()
107+
}
108+
109+
// Via streams data through given flow
110+
func (ns *NetSource) Via(_flow streams.Flow) streams.Flow {
111+
flow.DoStream(ns, _flow)
112+
return _flow
113+
}
114+
115+
// Out returns channel for sending data
116+
func (ns *NetSource) Out() <-chan interface{} {
117+
return ns.out
118+
}
119+
120+
// NetSink downstreams input events to a network soket
121+
type NetSink struct {
122+
conn net.Conn
123+
connType ConnType
124+
in chan interface{}
125+
}
126+
127+
// NewNetSink creates a new NetSink
128+
func NewNetSink(connType ConnType, address string) (*NetSink, error) {
129+
var err error
130+
var conn net.Conn
131+
132+
conn, err = net.DialTimeout(string(connType), address, time.Second*10)
133+
if err != nil {
134+
return nil, err
135+
}
136+
137+
sink := &NetSink{
138+
conn: conn,
139+
connType: connType,
140+
in: make(chan interface{}),
141+
}
142+
143+
go sink.init()
144+
return sink, nil
145+
}
146+
147+
// start main loop
148+
func (ns *NetSink) init() {
149+
log.Printf("NetSink connected on: %v", ns.conn.LocalAddr())
150+
writer := bufio.NewWriter(ns.conn)
151+
for msg := range ns.in {
152+
switch m := msg.(type) {
153+
case string:
154+
_, err := writer.WriteString(m)
155+
if err == nil {
156+
err = writer.Flush()
157+
}
158+
}
159+
}
160+
log.Printf("Closing NetSink connection %v", ns.conn)
161+
ns.conn.Close()
162+
}
163+
164+
// In returns channel for receiving data
165+
func (ns *NetSink) In() chan<- interface{} {
166+
return ns.in
167+
}

0 commit comments

Comments
 (0)