Skip to content

Commit 42d43eb

Browse files
Add Request Monitoring Feature to CoAP Connection
The WithRequestMonitor function has been implemented to enable request monitoring for the connection. This function is called for each CoAP message received from the peer before it is processed. Details of the Feature: - Functionality: WithRequestMonitor allows developers to implement custom request monitoring logic for incoming CoAP messages. - Error Handling: If the function returns an error, the connection is closed, providing a mechanism to handle and respond to issues in the monitoring process. - Message Dropping: If the function returns true, the incoming message is dropped, allowing for selective handling or filtering of messages based on monitoring criteria. --------- Co-authored-by: Jeff Welder <146992010+jeffwelder-ellenbytech@users.noreply.github.com>
1 parent ce37a93 commit 42d43eb

File tree

20 files changed

+689
-44
lines changed

20 files changed

+689
-44
lines changed

dtls/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,11 @@ func Client(conn *dtls.Conn, opts ...udp.Option) *udpClient.Conn {
105105
cfg.MTU,
106106
cfg.CloseSocket,
107107
)
108-
cc := udpClient.NewConn(session,
109-
createBlockWise,
110-
monitor,
108+
cc := udpClient.NewConnWithOpts(session,
111109
&cfg,
110+
udpClient.WithBlockWise(createBlockWise),
111+
udpClient.WithInactivityMonitor(monitor),
112+
udpClient.WithRequestMonitor(cfg.RequestMonitor),
112113
)
113114

114115
cfg.PeriodicRunner(func(now time.Time) bool {

dtls/server/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ var DefaultConfig = func() Config {
3434
}
3535
return inactivity.New(timeout, onInactive)
3636
},
37+
RequestMonitor: func(cc *udpClient.Conn, req *pool.Message) (bool, error) {
38+
return false, nil
39+
},
3740
OnNewConn: func(cc *udpClient.Conn) {
3841
// do nothing by default
3942
},
@@ -57,6 +60,7 @@ type Config struct {
5760
GetMID GetMIDFunc
5861
Handler HandlerFunc
5962
OnNewConn OnNewConnFunc
63+
RequestMonitor udpClient.RequestMonitorFunc
6064
TransmissionNStart uint32
6165
TransmissionAcknowledgeTimeout time.Duration
6266
TransmissionMaxRetransmit uint32

dtls/server/server.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func New(opt ...Option) *Server {
6363
return inactivity.NewNilMonitor[*udpClient.Conn]()
6464
}
6565
}
66+
6667
if cfg.MessagePool == nil {
6768
cfg.MessagePool = pool.New(0, 0)
6869
}
@@ -158,8 +159,10 @@ func (s *Server) Serve(l Listener) error {
158159
}
159160
wg.Add(1)
160161
var cc *udpClient.Conn
161-
monitor := s.cfg.CreateInactivityMonitor()
162-
cc = s.createConn(coapNet.NewConn(rw), monitor)
162+
inactivityMonitor := s.cfg.CreateInactivityMonitor()
163+
requestMonitor := s.cfg.RequestMonitor
164+
165+
cc = s.createConn(coapNet.NewConn(rw), inactivityMonitor, requestMonitor)
163166
if s.cfg.OnNewConn != nil {
164167
s.cfg.OnNewConn(cc)
165168
}
@@ -184,7 +187,7 @@ func (s *Server) Stop() {
184187
}
185188
}
186189

187-
func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.InactivityMonitor) *udpClient.Conn {
190+
func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor udpClient.InactivityMonitor, requestMonitor udpClient.RequestMonitorFunc) *udpClient.Conn {
188191
createBlockWise := func(cc *udpClient.Conn) *blockwise.BlockWise[*udpClient.Conn] {
189192
return nil
190193
}
@@ -220,11 +223,13 @@ func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.Inactivi
220223
cfg.MessagePool = s.cfg.MessagePool
221224
cfg.ReceivedMessageQueueSize = s.cfg.ReceivedMessageQueueSize
222225
cfg.ProcessReceivedMessage = s.cfg.ProcessReceivedMessage
223-
cc := udpClient.NewConn(
226+
227+
cc := udpClient.NewConnWithOpts(
224228
session,
225-
createBlockWise,
226-
monitor,
227229
&cfg,
230+
udpClient.WithBlockWise(createBlockWise),
231+
udpClient.WithInactivityMonitor(inactivityMonitor),
232+
udpClient.WithRequestMonitor(requestMonitor),
228233
)
229234

230235
return cc

message/message.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,11 @@ func (r *Message) String() string {
4848
}
4949
return buf
5050
}
51+
52+
// IsPing returns true if the message is a ping.
53+
func (r *Message) IsPing(isTCP bool) bool {
54+
if isTCP {
55+
return r.Code == codes.Ping
56+
}
57+
return r.Code == codes.Empty && r.Type == Confirmable && len(r.Token) == 0 && len(r.Options) == 0 && len(r.Payload) == 0
58+
}

message/message_internal_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package message
2+
3+
import (
4+
"testing"
5+
6+
"github.com/plgd-dev/go-coap/v3/message/codes"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestMessageIsPing(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
message *Message
14+
isTCP bool
15+
want bool
16+
}{
17+
{
18+
name: "Ping message (TCP)",
19+
message: &Message{
20+
Code: codes.Ping,
21+
Type: Confirmable,
22+
Token: nil,
23+
Options: nil,
24+
Payload: nil,
25+
},
26+
isTCP: true,
27+
want: true,
28+
},
29+
{
30+
name: "Ping message (UDP)",
31+
message: &Message{
32+
Code: codes.Empty,
33+
Type: Confirmable,
34+
Token: nil,
35+
Options: nil,
36+
Payload: nil,
37+
},
38+
isTCP: false,
39+
want: true,
40+
},
41+
{
42+
name: "Non-ping message (TCP)",
43+
message: &Message{
44+
Code: codes.GET,
45+
Type: Confirmable,
46+
Token: []byte{1, 2, 3},
47+
Options: []Option{{ID: 1, Value: []byte{4, 5, 6}}},
48+
Payload: []byte{7, 8, 9},
49+
},
50+
isTCP: true,
51+
want: false,
52+
},
53+
{
54+
name: "Non-ping message (UDP)",
55+
message: &Message{
56+
Code: codes.GET,
57+
Type: Confirmable,
58+
Token: []byte{1, 2, 3},
59+
Options: []Option{{ID: 1, Value: []byte{4, 5, 6}}},
60+
Payload: []byte{7, 8, 9},
61+
},
62+
isTCP: false,
63+
want: false,
64+
},
65+
}
66+
67+
for _, tt := range tests {
68+
t.Run(tt.name, func(t *testing.T) {
69+
got := tt.message.IsPing(tt.isTCP)
70+
require.Equal(t, tt.want, got)
71+
})
72+
}
73+
}

message/pool/message.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,3 +618,7 @@ func (r *Message) Clone(msg *Message) error {
618618
}
619619
return nil
620620
}
621+
622+
func (r *Message) IsPing(isTCP bool) bool {
623+
return r.msg.IsPing(isTCP)
624+
}

options/commonOptions.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,60 @@ func WithOnNewConn[F OnNewConnFunc](onNewConn F) OnNewConnOpt[F] {
593593
}
594594
}
595595

596+
// WithRequestMonitor
597+
type WithRequestMonitorFunc interface {
598+
tcpClient.RequestMonitorFunc | udpClient.RequestMonitorFunc
599+
}
600+
601+
// WithRequestMonitorOpt network option.
602+
type WithRequestMonitorOpt[F WithRequestMonitorFunc] struct {
603+
f F
604+
}
605+
606+
func panicForInvalidWithRequestMonitorFunc(t, exp any) {
607+
panic(fmt.Errorf("invalid WithRequestMonitorFunc type %T, expected %T", t, exp))
608+
}
609+
610+
func (o WithRequestMonitorOpt[F]) UDPServerApply(cfg *udpServer.Config) {
611+
switch v := any(o.f).(type) {
612+
case udpClient.RequestMonitorFunc:
613+
cfg.RequestMonitor = v
614+
default:
615+
var exp udpClient.RequestMonitorFunc
616+
panicForInvalidWithRequestMonitorFunc(v, exp)
617+
}
618+
}
619+
620+
func (o WithRequestMonitorOpt[F]) DTLSServerApply(cfg *dtlsServer.Config) {
621+
switch v := any(o.f).(type) {
622+
case udpClient.RequestMonitorFunc:
623+
cfg.RequestMonitor = v
624+
default:
625+
var exp udpClient.RequestMonitorFunc
626+
panicForInvalidWithRequestMonitorFunc(v, exp)
627+
}
628+
}
629+
630+
func (o WithRequestMonitorOpt[F]) TCPServerApply(cfg *tcpServer.Config) {
631+
switch v := any(o.f).(type) {
632+
case tcpClient.RequestMonitorFunc:
633+
cfg.RequestMonitor = v
634+
default:
635+
var exp tcpClient.RequestMonitorFunc
636+
panicForInvalidWithRequestMonitorFunc(v, exp)
637+
}
638+
}
639+
640+
// WithRequestMonitor enables request monitoring for the connection.
641+
// It is called for each CoAP message received from the peer before it is processed.
642+
// If it returns an error, the connection is closed.
643+
// If it returns true, the message is dropped.
644+
func WithRequestMonitor[F WithRequestMonitorFunc](requestMonitor F) WithRequestMonitorOpt[F] {
645+
return WithRequestMonitorOpt[F]{
646+
f: requestMonitor,
647+
}
648+
}
649+
596650
// CloseSocketOpt close socket option.
597651
type CloseSocketOpt struct{}
598652

tcp/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ func Client(conn net.Conn, opts ...Option) *client.Conn {
8888

8989
l := coapNet.NewConn(conn)
9090
monitor := cfg.CreateInactivityMonitor()
91-
cc := client.NewConn(l,
92-
createBlockWise,
93-
monitor,
91+
cc := client.NewConnWithOpts(l,
9492
&cfg,
93+
client.WithBlockWise(createBlockWise),
94+
client.WithInactivityMonitor(monitor),
95+
client.WithRequestMonitor(cfg.RequestMonitor),
9596
)
9697

9798
cfg.PeriodicRunner(func(now time.Time) bool {

tcp/client/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ var DefaultConfig = func() Config {
2020
CreateInactivityMonitor: func() InactivityMonitor {
2121
return inactivity.NewNilMonitor[*Conn]()
2222
},
23+
RequestMonitor: func(*Conn, *pool.Message) (bool, error) {
24+
return false, nil
25+
},
2326
Dialer: &net.Dialer{Timeout: time.Second * 3},
2427
Net: "tcp",
2528
ConnectionCacheSize: 2048,
@@ -38,6 +41,7 @@ var DefaultConfig = func() Config {
3841
type Config struct {
3942
config.Common[*Conn]
4043
CreateInactivityMonitor CreateInactivityMonitorFunc
44+
RequestMonitor RequestMonitorFunc
4145
Net string
4246
Dialer *net.Dialer
4347
TLSCfg *tls.Config

tcp/client/conn.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/plgd-dev/go-coap/v3/net/blockwise"
1515
"github.com/plgd-dev/go-coap/v3/net/client"
1616
limitparallelrequests "github.com/plgd-dev/go-coap/v3/net/client/limitParallelRequests"
17+
"github.com/plgd-dev/go-coap/v3/net/monitor/inactivity"
1718
"github.com/plgd-dev/go-coap/v3/net/observation"
1819
"github.com/plgd-dev/go-coap/v3/net/responsewriter"
1920
coapErrors "github.com/plgd-dev/go-coap/v3/pkg/errors"
@@ -32,6 +33,7 @@ type (
3233
EventFunc = func()
3334
GetMIDFunc = func() int32
3435
CreateInactivityMonitorFunc = func() InactivityMonitor
36+
RequestMonitorFunc = func(cc *Conn, req *pool.Message) (drop bool, err error)
3537
)
3638

3739
type Notifier interface {
@@ -54,16 +56,64 @@ type Conn struct {
5456
receivedMessageReader *client.ReceivedMessageReader[*Conn]
5557
}
5658

59+
type ConnOptions struct {
60+
CreateBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]
61+
InactivityMonitor InactivityMonitor
62+
RequestMonitor RequestMonitorFunc
63+
}
64+
65+
type Option = func(opts *ConnOptions)
66+
67+
// WithBlockWise enables block-wise transfer for the connection.
68+
func WithBlockWise(createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]) Option {
69+
return func(opts *ConnOptions) {
70+
opts.CreateBlockWise = createBlockWise
71+
}
72+
}
73+
74+
// WithInactivityMonitor enables inactivity monitor for the connection.
75+
func WithInactivityMonitor(inactivityMonitor InactivityMonitor) Option {
76+
return func(opts *ConnOptions) {
77+
opts.InactivityMonitor = inactivityMonitor
78+
}
79+
}
80+
81+
// WithRequestMonitor enables request monitoring for the connection.
82+
// It is called for each CoAP message received from the peer before it is processed.
83+
// If it returns an error, the connection is closed.
84+
// If it returns true, the message is dropped.
85+
func WithRequestMonitor(requestMonitor RequestMonitorFunc) Option {
86+
return func(opts *ConnOptions) {
87+
opts.RequestMonitor = requestMonitor
88+
}
89+
}
90+
5791
// NewConn creates connection over session and observation.
5892
func NewConn(
5993
connection *coapNet.Conn,
6094
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn],
6195
inactivityMonitor InactivityMonitor,
6296
cfg *Config,
6397
) *Conn {
98+
return NewConnWithOpts(connection, cfg, WithBlockWise(createBlockWise), WithInactivityMonitor(inactivityMonitor))
99+
}
100+
101+
func NewConnWithOpts(connection *coapNet.Conn, cfg *Config, opts ...Option) *Conn {
64102
if cfg.GetToken == nil {
65103
cfg.GetToken = message.GetToken
66104
}
105+
cfgOpts := ConnOptions{
106+
CreateBlockWise: func(cc *Conn) *blockwise.BlockWise[*Conn] {
107+
return nil
108+
},
109+
InactivityMonitor: inactivity.NewNilMonitor[*Conn](),
110+
RequestMonitor: func(*Conn, *pool.Message) (bool, error) {
111+
return false, nil
112+
},
113+
}
114+
for _, o := range opts {
115+
o(&cfgOpts)
116+
}
67117
cc := Conn{
68118
tokenHandlerContainer: coapSync.NewMap[uint64, HandlerFunc](),
69119
blockwiseSZX: cfg.BlockwiseSZX,
@@ -72,14 +122,15 @@ func NewConn(
72122
limitParallelRequests := limitparallelrequests.New(cfg.LimitClientParallelRequests, cfg.LimitClientEndpointParallelRequests, cc.do, cc.doObserve)
73123
cc.observationHandler = observation.NewHandler(&cc, cfg.Handler, limitParallelRequests.Do)
74124
cc.Client = client.New(&cc, cc.observationHandler, cfg.GetToken, limitParallelRequests)
75-
cc.blockWise = createBlockWise(&cc)
125+
cc.blockWise = cfgOpts.CreateBlockWise(&cc)
76126
session := NewSession(cfg.Ctx,
77127
connection,
78128
cfg.MaxMessageSize,
79129
cfg.Errors,
80130
cfg.DisableTCPSignalMessageCSM,
81131
cfg.CloseSocket,
82-
inactivityMonitor,
132+
cfgOpts.InactivityMonitor,
133+
cfgOpts.RequestMonitor,
83134
cfg.ConnectionCacheSize,
84135
cfg.MessagePool,
85136
)

0 commit comments

Comments
 (0)