Skip to content

Commit 4ab2474

Browse files
authored
Merge pull request #559 from metafacture/495-objectSleep
2 parents 05c8dae + 74bc15d commit 4ab2474

File tree

3 files changed

+225
-0
lines changed

3 files changed

+225
-0
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2024 hbz
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.metafacture.flowcontrol;
18+
19+
import org.metafacture.framework.FluxCommand;
20+
import org.metafacture.framework.MetafactureException;
21+
import org.metafacture.framework.ObjectReceiver;
22+
import org.metafacture.framework.annotations.Description;
23+
import org.metafacture.framework.annotations.In;
24+
import org.metafacture.framework.annotations.Out;
25+
import org.metafacture.framework.helpers.DefaultObjectPipe;
26+
27+
import java.util.concurrent.TimeUnit;
28+
29+
/**
30+
* Lets the process sleep for a specific amount of time between objects.
31+
*
32+
* @param <T> object type
33+
* @author Tobias Bülte
34+
*/
35+
@Description("Lets the process sleep for a specific amount of time between objects.")
36+
@In(Object.class)
37+
@Out(Object.class)
38+
@FluxCommand("sleep")
39+
public final class ObjectSleeper<T> extends DefaultObjectPipe<T, ObjectReceiver<T>> {
40+
41+
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
42+
public static final long DEFAULT_SLEEP_TIME = 1000;
43+
44+
private static final String TIME_UNIT_SUFFIX = "S";
45+
46+
private TimeUnit timeUnit = DEFAULT_TIME_UNIT;
47+
private long sleepTime = DEFAULT_SLEEP_TIME;
48+
49+
/**
50+
* Creates an instance of {@link ObjectSleeper}.
51+
*/
52+
public ObjectSleeper() {
53+
}
54+
55+
/**
56+
* Sets the amount of time for the sleep phase (measured in {@link
57+
* #setTimeUnit time unit}).
58+
*
59+
* @param sleepTime the time to sleep
60+
*/
61+
public void setSleepTime(final int sleepTime) {
62+
// NOTE: ConfigurableClass.convertValue() doesn't support long.
63+
setSleepTime((long) sleepTime);
64+
}
65+
66+
/**
67+
* Sets the amount of time for the sleep phase (measured in {@link
68+
* #setTimeUnit time unit}).
69+
*
70+
* @param sleepTime the time to sleep
71+
*/
72+
public void setSleepTime(final long sleepTime) {
73+
this.sleepTime = sleepTime;
74+
}
75+
76+
/**
77+
* Gets the amount of time for the sleep phase (measured in {@link
78+
* #setTimeUnit time unit}).
79+
*
80+
* @return the time to sleep
81+
*/
82+
public long getSleepTime() {
83+
return sleepTime;
84+
}
85+
86+
/**
87+
* Sets the time unit for the sleep phase. See {@link TimeUnit available
88+
* time units}, case-insensitive, trailing "s" optional.
89+
*
90+
* @param timeUnit the time unit
91+
*/
92+
public void setTimeUnit(final String timeUnit) {
93+
// NOTE: Adds NANOSECONDS and DAYS over Catmandu's supported time units.
94+
95+
final String timeUnitName = timeUnit.toUpperCase();
96+
final String timeUnitSuffix = timeUnitName.endsWith(TIME_UNIT_SUFFIX) ? "" : TIME_UNIT_SUFFIX;
97+
98+
setTimeUnit(TimeUnit.valueOf(timeUnitName + timeUnitSuffix));
99+
}
100+
101+
/**
102+
* Sets the time unit for the sleep phase.
103+
*
104+
* @param timeUnit the time unit
105+
*/
106+
public void setTimeUnit(final TimeUnit timeUnit) {
107+
this.timeUnit = timeUnit;
108+
}
109+
110+
/**
111+
* Gets the time unit for the sleep phase.
112+
*
113+
* @return the time unit
114+
*/
115+
public TimeUnit getTimeUnit() {
116+
return timeUnit;
117+
}
118+
119+
/**
120+
* Sleeps for the specified amount of time.
121+
*/
122+
public void sleep() {
123+
try {
124+
timeUnit.sleep(sleepTime);
125+
}
126+
catch (final InterruptedException e) {
127+
Thread.currentThread().interrupt();
128+
throw new MetafactureException(e.getMessage(), e);
129+
}
130+
}
131+
132+
@Override
133+
public void process(final T obj) {
134+
sleep();
135+
getReceiver().process(obj);
136+
}
137+
138+
}

metafacture-flowcontrol/src/main/resources/flux-commands.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ reset-object-batch org.metafacture.flowcontrol.ObjectBatchResetter
2121
defer-stream org.metafacture.flowcontrol.StreamDeferrer
2222
catch-stream-exception org.metafacture.flowcontrol.StreamExceptionCatcher
2323
thread-object-tee org.metafacture.flowcontrol.ObjectThreader
24+
sleep org.metafacture.flowcontrol.ObjectSleeper
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2024 Tobias Bülte, hbz
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.metafacture.flowcontrol;
18+
19+
import org.metafacture.framework.ObjectReceiver;
20+
21+
import org.junit.Assert;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.mockito.Mock;
25+
import org.mockito.MockitoAnnotations;
26+
27+
import java.util.function.Consumer;
28+
29+
/**
30+
* Tests for class {@link ObjectSleeper}.
31+
*
32+
* @author Tobias Bülte
33+
*/
34+
public final class ObjectSleeperTest {
35+
36+
private static final int PROCESS_OVERHEAD_MILLISECONDS = 100;
37+
38+
private static final int MILLISECONDS_PER_SECOND = 1_000;
39+
private static final int NANOSECONDS_PER_MILLISECOND = 1_000_000;
40+
41+
@Mock
42+
private ObjectReceiver<String> receiver;
43+
44+
@Before
45+
public void setup() {
46+
MockitoAnnotations.initMocks(this);
47+
}
48+
49+
@Test
50+
public void shouldTestIfClockedTimeExceedsDuration() {
51+
final int sleepTime = 1234;
52+
assertSleep(sleepTime, s -> s.setSleepTime(sleepTime));
53+
}
54+
55+
@Test
56+
public void shouldTestIfClockedTimeExceedsDurationInMilliseconds() {
57+
final int sleepTime = 567;
58+
assertSleep(sleepTime, s -> {
59+
s.setSleepTime(sleepTime);
60+
s.setTimeUnit("MILLISECONDS");
61+
});
62+
}
63+
64+
@Test
65+
public void shouldTestIfClockedTimeExceedsDurationInSeconds() {
66+
final int sleepTime = 1;
67+
assertSleep(sleepTime * MILLISECONDS_PER_SECOND, s -> {
68+
s.setSleepTime(sleepTime);
69+
s.setTimeUnit("SECOND");
70+
});
71+
}
72+
73+
private void assertSleep(final long expectedMillis, final Consumer<ObjectSleeper> consumer) {
74+
final ObjectSleeper<String> objectSleeper = new ObjectSleeper<>();
75+
objectSleeper.setReceiver(receiver);
76+
consumer.accept(objectSleeper);
77+
78+
final long startTime = System.nanoTime();
79+
objectSleeper.process(null);
80+
final long actualMillis = (System.nanoTime() - startTime) / NANOSECONDS_PER_MILLISECOND;
81+
82+
Assert.assertTrue("sleep time too short: " + actualMillis, actualMillis >= expectedMillis);
83+
Assert.assertTrue("sleep time too long: " + actualMillis, actualMillis < expectedMillis + PROCESS_OVERHEAD_MILLISECONDS);
84+
}
85+
86+
}

0 commit comments

Comments
 (0)