Skip to content

Commit 967cf13

Browse files
authored
Merge pull request #3 from jgn-epp/feature/peeking
Feature/peeking
2 parents 04346cd + 6a05e55 commit 967cf13

File tree

4 files changed

+146
-11
lines changed

4 files changed

+146
-11
lines changed

MSMQLib/MSMQInterface.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,47 @@ public async Task<object> ReceiveMessages(dynamic input)
9999
}
100100
}
101101

102+
/// <summary>
103+
/// Removes a message with the given id from the queue.
104+
/// </summary>
105+
/// <param name="input">The queue's path and message id.</param>
106+
public async Task<object> ReceiveMessageById(dynamic input)
107+
{
108+
var path = (string)input.path;
109+
var id = (string)input.id;
110+
111+
MessageQueue queue = new MessageQueue(path);
112+
queue.Formatter = new BinaryMessageFormatter();
113+
queue.MessageReadPropertyFilter.SetAll();
114+
115+
try
116+
{
117+
var msg = (MSMQMessage)queue.ReceiveById(id);
118+
return true;
119+
}
120+
catch (InvalidOperationException) // Thrown if message with given id is not in queue.
121+
{
122+
return false;
123+
}
124+
}
125+
126+
/// <summary>
127+
/// Peeks a message from a queue without removing it.
128+
/// </summary>
129+
/// <param name="path">The queue's path.</param>
130+
public async Task<object> Peek(string path)
131+
{
132+
MessageQueue queue = new MessageQueue(path);
133+
queue.Formatter = new BinaryMessageFormatter();
134+
queue.MessageReadPropertyFilter.SetAll();
135+
136+
var msg = await Task.Factory.FromAsync<Message>(
137+
queue.BeginPeek(),
138+
queue.EndPeek);
139+
140+
return (MSMQMessage)msg;
141+
}
142+
102143
/// <summary>
103144
/// Gets all messages from a queue without removing them.
104145
/// </summary>

proxy.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ const baseOptions = {
77
};
88

99
function getMethod(methodName) {
10-
return edge.func(Object.assign({}, baseOptions, {methodName}));
10+
return edge.func(Object.assign({}, baseOptions, { methodName }));
1111
}
1212

1313
export var queueProxy = {
1414
exists: getMethod('ExistsQueue'),
1515
create: getMethod('CreateQueue'),
1616
send: getMethod('SendMessage'),
1717
receive: getMethod('ReceiveMessages'),
18+
peek: getMethod('Peek'),
19+
remove: getMethod('ReceiveMessageById'),
1820
list: getMethod('GetAllMessages'),
19-
clear: getMethod('PurgeQueue'),
20-
connectRemote: getMethod('ConnectRemote')
21+
clear: getMethod('PurgeQueue'),
22+
connectRemote: getMethod('ConnectRemote')
2123
};

queue.js

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import {EventEmitter} from 'events';
2-
import {queueProxy} from './proxy';
1+
import { EventEmitter } from 'events';
2+
import { queueProxy } from './proxy';
33

44
export default class Queue extends EventEmitter {
55

@@ -27,10 +27,10 @@ export default class Queue extends EventEmitter {
2727
return new Queue(path);
2828
}
2929

30-
static connectToRemoteQueue(path) {
31-
queueProxy.connectRemote(path, true);
32-
return new Queue(path);
33-
}
30+
static connectToRemoteQueue(path) {
31+
queueProxy.connectRemote(path, true);
32+
return new Queue(path);
33+
}
3434

3535
startReceiving() {
3636
if (this.receiving) {
@@ -47,6 +47,49 @@ export default class Queue extends EventEmitter {
4747
});
4848
}
4949

50+
startPeeking() {
51+
if (this.receiving) {
52+
throw new Error('Already receiving messages from this queue');
53+
}
54+
55+
this.receiving = true;
56+
57+
const start = () => {
58+
this.peek().then(msg => {
59+
this.emit('peek', {
60+
msg,
61+
next: async () => {
62+
await this.remove(msg.id);
63+
start();
64+
}
65+
})
66+
})
67+
};
68+
69+
start();
70+
}
71+
72+
peek() {
73+
return new Promise((resolve, reject) => {
74+
queueProxy.peek(this.path, (error, msg) => {
75+
if (error) reject(error);
76+
else resolve(msg);
77+
});
78+
});
79+
}
80+
81+
remove(id) {
82+
return new Promise((resolve, reject) => {
83+
queueProxy.remove({
84+
path: this.path,
85+
id
86+
}, (error, result) => {
87+
if (error) reject(error);
88+
else resolve(result);
89+
});
90+
})
91+
}
92+
5093
send(message, cb) {
5194
let formattedMessage = JSON.stringify(message);
5295

readme.md

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ NPM package is published as updated-node-msmq. https://www.npmjs.com/package/upd
99
## Differences from `node-msmq`
1010

1111
* Support for Node.Js 6.x, 7.x, 8.x, 9.x, 10.x
12-
* Support to push objects to the queue instead of just strings.
12+
* Support to push objects to the queue instead of just strings.
1313
* Support to send/receive messages to/from a queue on a **remote** machine.
1414

1515
## Install
@@ -51,6 +51,55 @@ queue.on('receive', (msg) => {
5151
queue.startReceiving();
5252
```
5353

54+
### Peek messages
55+
56+
Start listeng for messages without removing them until you are ready to do so. Callbacks for the 'peek' event receive an object containing two properties:
57+
58+
* `msg` The MSMQ Message.
59+
* `next()` Pops the message from the queue and start peeking for the next one.
60+
61+
```js
62+
queue.on('peek', (event) => {
63+
console.log(event.msg.body);
64+
65+
// Do some logic that might fail.
66+
if (Math.random() > 0.5) {
67+
throw Error('number is too high!');
68+
}
69+
70+
// Remove the message from the queue and peek for next message.
71+
event.next();
72+
})
73+
74+
queue.startPeeking();
75+
```
76+
77+
### Peek
78+
79+
Promised based method to peek for a message in the queue without removing it. Resolves to a MSMQMessage.
80+
81+
```js
82+
queue.peek().then(msg => console.log(msg.body));
83+
84+
//or
85+
86+
let msg = await queue.peek();
87+
```
88+
89+
### Remove
90+
91+
Promise based method to remove a message with given id from the queue. Resolves to `true` if message was removed and `false` if message didn't exist in the queue.
92+
93+
```js
94+
queue.remove('12345')
95+
.then(status => console.log(status ? 'Message was removed'
96+
: 'Message was not in queue'));
97+
98+
//or
99+
100+
let status = await queue.remove('12345');
101+
```
102+
54103
### Get all messages
55104

56105
Gets all messages without removing them from queue.
@@ -124,7 +173,7 @@ var queue = msmq.connectToRemoteQueue('.\\Private$\\MyAwesomeQueue');
124173
var messages = queue.getAllMessages();
125174
```
126175

127-
#### Note:
176+
#### Note:
128177
* Creating a queue / Checking if a queue exists on a remote machine is currently not supported by MSMQ.
129178
* To communicate with a remote queue, MSMQ should be enabled in the sender's machine too. Also, in the _Security_ tab of the queue on the remote machine should have the appropriate permissions set for _Everyone_ and _ANONYMOUS LOGON_.
130179
* The queue should already be created on the remote machine.

0 commit comments

Comments
 (0)