Skip to content

Commit f372dfb

Browse files
committed
Add queue on classes
1 parent f880372 commit f372dfb

File tree

1 file changed

+299
-0
lines changed

1 file changed

+299
-0
lines changed

lib/queue.class.js

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
'use strict';
2+
3+
const QUEUE_TIMEOUT = 'Metasync: Queue timed out';
4+
5+
class Queue {
6+
// Queue constructor
7+
// concurrency - <number>, asynchronous concurrency
8+
constructor(concurrency) {
9+
this.paused = false;
10+
this.concurrency = concurrency;
11+
this.waitTimeout = 0;
12+
this.processTimeout = 0;
13+
this.throttleCount = 0;
14+
this.throttleInterval = 1000;
15+
this.count = 0;
16+
this.tasks = [];
17+
this.waiting = [];
18+
this.factors = {};
19+
this.fifoMode = true;
20+
this.roundRobinMode = false;
21+
this.priorityMode = false;
22+
this.onProcess = null;
23+
this.onDone = null;
24+
this.onSuccess = null;
25+
this.onTimeout = null;
26+
this.onFailure = null;
27+
this.onDrain = null;
28+
}
29+
30+
// Set wait before processing timeout
31+
// msec - <number>, wait timeout for single item
32+
//
33+
// Returns: <this>
34+
wait(msec) {
35+
this.waitTimeout = msec;
36+
return this;
37+
}
38+
39+
// Throttle to limit throughput
40+
// Signature: count[, interval]
41+
// count - <number>, item count
42+
// interval - <number>, per interval, optional
43+
// default: 1000 msec
44+
//
45+
// Returns: <this>
46+
throttle(count, interval = 1000) {
47+
this.throttleCount = count;
48+
this.throttleInterval = interval;
49+
return this;
50+
}
51+
52+
// Add item to queue
53+
// Signature: item[, factor[, priority]]
54+
// item - <Object>, to be added
55+
// factor - <number> | <string>, type, source,
56+
// destination or path, optional
57+
// priority - <number>, optional
58+
//
59+
// Returns: <this>
60+
add(item, factor = 0, priority = 0) {
61+
if (this.priorityMode && !this.roundRobinMode) {
62+
priority = factor;
63+
factor = 0;
64+
}
65+
const task = [item, factor, priority];
66+
const slot = this.count < this.concurrency;
67+
if (!this.paused && slot && this.onProcess) {
68+
this.next(task);
69+
return this;
70+
}
71+
let tasks;
72+
if (this.roundRobinMode) {
73+
tasks = this.factors[factor];
74+
if (!tasks) {
75+
tasks = [];
76+
this.factors[factor] = tasks;
77+
this.waiting.push(tasks);
78+
}
79+
} else {
80+
tasks = this.tasks;
81+
}
82+
83+
if (this.fifoMode) tasks.push(task);
84+
else tasks.unshift(task);
85+
86+
if (this.priorityMode) {
87+
if (this.fifoMode) {
88+
tasks.sort((a, b) => b[2] - a[2]);
89+
} else {
90+
tasks.sort((a, b) => a[2] - b[2]);
91+
}
92+
}
93+
return this;
94+
}
95+
96+
// Process next item
97+
// task - <Array>, next task [item, factor, priority]
98+
//
99+
// Returns: <this>
100+
next(task) {
101+
const item = task[0];
102+
let timer;
103+
this.count++;
104+
if (this.processTimeout) {
105+
timer = setTimeout(() => {
106+
const err = new Error(QUEUE_TIMEOUT);
107+
if (this.onTimeout) this.onTimeout(err);
108+
}, this.processTimeout);
109+
}
110+
this.onProcess(item, (err, result) => {
111+
if (this.onDone) this.onDone(err, result);
112+
if (err) {
113+
if (this.onFailure) this.onFailure(err);
114+
} else if (this.onSuccess) {
115+
this.onSuccess(result);
116+
}
117+
if (timer) {
118+
clearTimeout(timer);
119+
timer = null;
120+
}
121+
this.count--;
122+
if (this.tasks.length > 0 || this.waiting.length > 0) {
123+
this.takeNext();
124+
} else if (this.count === 0 && this.onDrain) {
125+
this.onDrain();
126+
}
127+
});
128+
return this;
129+
}
130+
131+
// Prepare next item for processing
132+
//
133+
// Returns: <this>
134+
takeNext() {
135+
if (this.paused || !this.onProcess) {
136+
return this;
137+
}
138+
let tasks;
139+
if (this.roundRobinMode) {
140+
tasks = this.waiting.shift();
141+
if (tasks.length > 1) {
142+
this.waiting.push(tasks);
143+
}
144+
} else {
145+
tasks = this.tasks;
146+
}
147+
const task = tasks.shift();
148+
if (task) this.next(task);
149+
return this;
150+
}
151+
152+
// This function is not completely implemented yet
153+
//
154+
// Returns: <this>
155+
pause() {
156+
this.paused = true;
157+
return this;
158+
}
159+
160+
// Resume queue
161+
// This function is not completely implemented yet
162+
//
163+
// Returns: <this>
164+
resume() {
165+
this.paused = false;
166+
return this;
167+
}
168+
169+
// Clear queue
170+
//
171+
// Returns: <this>
172+
clear() {
173+
this.count = 0;
174+
this.tasks = [];
175+
this.waiting = [];
176+
this.factors = {};
177+
return this;
178+
}
179+
180+
// Set timeout interval and listener
181+
// msec - <number>, process timeout for single item
182+
// onTimeout - <Function>
183+
//
184+
// Returns: <this>
185+
timeout(msec, onTimeout = null) {
186+
this.processTimeout = msec;
187+
if (onTimeout) this.onTimeout = onTimeout;
188+
return this;
189+
}
190+
191+
// Set processing function
192+
// fn - <Function>
193+
// item - <Object>
194+
// callback - <Function>
195+
// err - <Error> | <null>
196+
// result - <any>
197+
//
198+
// Returns: <this>
199+
process(fn) {
200+
this.onProcess = fn;
201+
return this;
202+
}
203+
204+
// Set listener on processing done
205+
// fn - <Function>, done listener
206+
// err - <Error> | <null>
207+
// result - <any>
208+
//
209+
// Returns: <this>
210+
done(fn) {
211+
this.onDone = fn;
212+
return this;
213+
}
214+
215+
// Set listener on processing success
216+
// listener - <Function>, on success
217+
// item - <any>
218+
//
219+
// Returns: <this>
220+
success(listener) {
221+
this.onSuccess = listener;
222+
return this;
223+
}
224+
225+
// Set listener on processing error
226+
// listener - <Function>, on failure
227+
// err - <Error> | <null>
228+
//
229+
// Returns: <this>
230+
failure(listener) {
231+
this.onFailure = listener;
232+
return this;
233+
}
234+
235+
// Set listener on drain Queue
236+
// listener - <Function>, on drain
237+
//
238+
// Returns: <this>
239+
drain(listener) {
240+
this.onDrain = listener;
241+
return this;
242+
}
243+
244+
// Switch to FIFO mode (default for Queue)
245+
//
246+
// Returns: <this>
247+
fifo() {
248+
this.fifoMode = true;
249+
return this;
250+
}
251+
252+
// Switch to LIFO mode
253+
//
254+
// Returns: <this>
255+
lifo() {
256+
this.fifoMode = false;
257+
return this;
258+
}
259+
260+
// Activate or deactivate priority mode
261+
// flag - <boolean>, default: true, false will
262+
// disable priority mode
263+
//
264+
// Returns: <this>
265+
priority(flag = true) {
266+
this.priorityMode = flag;
267+
return this;
268+
}
269+
270+
// Activate or deactivate round robin mode
271+
// flag - <boolean>, default: true, false will
272+
// disable roundRobin mode
273+
//
274+
// Returns: <this>
275+
roundRobin(flag = true) {
276+
this.roundRobinMode = flag;
277+
return this;
278+
}
279+
280+
// Pipe processed items to different queue
281+
// dest - <Queue>, destination queue
282+
//
283+
// Returns: <this>
284+
pipe(dest) {
285+
if (dest instanceof Queue) {
286+
this.success((item) => void dest.add(item));
287+
}
288+
return this;
289+
}
290+
}
291+
292+
// Create Queue instance
293+
// concurrency - <number>, simultaneous and
294+
// asynchronously executing tasks
295+
//
296+
// Returns: <Queue>
297+
const queue = (concurrency) => new Queue(concurrency);
298+
299+
module.exports = { queue, Queue };

0 commit comments

Comments
 (0)