Skip to content

Commit fa81736

Browse files
authored
Merge pull request #70 from avaje/feature/add-ScheduledTask
Add ScheduledTask to help execute periodic reporting of metrics
2 parents 1941bb8 + 30b2f9f commit fa81736

File tree

6 files changed

+243
-0
lines changed

6 files changed

+243
-0
lines changed

metrics/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
<dependencies>
1616

17+
<dependency>
18+
<groupId>io.avaje</groupId>
19+
<artifactId>avaje-applog</artifactId>
20+
<version>1.0</version>
21+
</dependency>
22+
1723
<dependency>
1824
<groupId>io.avaje</groupId>
1925
<artifactId>junit</artifactId>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.avaje.metrics;
2+
3+
import io.avaje.applog.AppLog;
4+
5+
import java.util.Objects;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ScheduledExecutorService;
8+
import java.util.concurrent.ScheduledFuture;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.locks.ReentrantLock;
11+
12+
import static java.lang.System.Logger.Level.ERROR;
13+
14+
final class DScheduledTask implements ScheduledTask {
15+
16+
private static final System.Logger log = AppLog.getLogger(DScheduledTask.class);
17+
18+
static final class DBuilder implements Builder {
19+
private int initial = 60;
20+
private int delay = 60;
21+
private TimeUnit timeUnit = TimeUnit.SECONDS;
22+
23+
private Runnable task;
24+
25+
@Override
26+
public DBuilder schedule(int initial, int delay, TimeUnit timeUnit) {
27+
this.initial = initial;
28+
this.delay = delay;
29+
this.timeUnit = timeUnit;
30+
return this;
31+
}
32+
33+
@Override
34+
public DBuilder task(Runnable task) {
35+
this.task = task;
36+
return this;
37+
}
38+
39+
@Override
40+
public DScheduledTask build() {
41+
Objects.requireNonNull(task, "task is required");
42+
return new DScheduledTask(task, initial, delay, timeUnit);
43+
}
44+
}
45+
46+
47+
private final ReentrantLock activeLock = new ReentrantLock();
48+
private final Runnable task;
49+
private final int initial;
50+
private final int delay;
51+
private final TimeUnit timeUnit;
52+
53+
private final ScheduledExecutorService executor;
54+
private ScheduledFuture<?> backgroundTask;
55+
56+
DScheduledTask(Runnable task, int initial, int delay, TimeUnit timeUnit) {
57+
this.task = task;
58+
this.initial = initial;
59+
this.delay = delay;
60+
this.timeUnit = timeUnit;
61+
this.executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("schTask"));
62+
}
63+
64+
@Override
65+
public void start() {
66+
this.backgroundTask = executor.scheduleWithFixedDelay(this::runTask, initial, delay, timeUnit);
67+
}
68+
69+
@Override
70+
public boolean cancel(boolean mayInterruptIfRunning) {
71+
return this.backgroundTask.cancel(mayInterruptIfRunning);
72+
}
73+
74+
/**
75+
* Wait for the task to complete if it is actively running .
76+
*/
77+
@Override
78+
public void waitIfRunning(long timeout, TimeUnit timeUnit) {
79+
try {
80+
if (activeLock.tryLock(timeout, timeUnit)) {
81+
activeLock.unlock();
82+
}
83+
} catch (InterruptedException e) {
84+
Thread.currentThread().interrupt();
85+
log.log(ERROR, "interrupted while waiting for task to complete", e);
86+
}
87+
}
88+
89+
private void runTask() {
90+
activeLock.lock();
91+
try {
92+
task.run();
93+
} catch (Throwable e) {
94+
log.log(ERROR, "Error stopping task", e);
95+
} finally {
96+
activeLock.unlock();
97+
}
98+
}
99+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.avaje.metrics;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
/**
7+
* ThreadFactory for Daemon threads.
8+
* <p>
9+
* Daemon threads do not stop a JVM stopping. If an application only has Daemon
10+
* threads left it will shutdown.
11+
* <p>
12+
* In using Daemon threads you need to either not care about being interrupted
13+
* on shutdown or register with the JVM shutdown hook to perform a nice shutdown
14+
* of the daemon threads etc.
15+
*/
16+
final class DaemonThreadFactory implements ThreadFactory {
17+
18+
private final AtomicInteger threadNumber = new AtomicInteger(1);
19+
private final String namePrefix;
20+
21+
DaemonThreadFactory(String namePrefix) {
22+
this.namePrefix = namePrefix;
23+
}
24+
25+
@Override
26+
public Thread newThread(Runnable r) {
27+
Thread t = new Thread(null, r, namePrefix + threadNumber.getAndIncrement(), 0);
28+
t.setDaemon(true);
29+
if (t.getPriority() != Thread.NORM_PRIORITY) {
30+
t.setPriority(Thread.NORM_PRIORITY);
31+
}
32+
return t;
33+
}
34+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.avaje.metrics;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
/**
6+
* A ScheduledTask that can run periodically, be cancelled, and
7+
* also aware of when its running (for use with Lambda).
8+
* <p>
9+
* The ScheduledTask will use a Daemon thread and expect to just
10+
* stop on JVM shutdown.
11+
*/
12+
public interface ScheduledTask {
13+
14+
/**
15+
* Return the Builder for a ScheduledTask.
16+
*/
17+
static Builder builder() {
18+
return new DScheduledTask.DBuilder();
19+
}
20+
21+
/**
22+
* Start the scheduled task.
23+
*/
24+
void start();
25+
26+
/**
27+
* Cancel the scheduled task.
28+
*/
29+
boolean cancel(boolean mayInterruptIfRunning);
30+
31+
/**
32+
* If the task is actively running wait for the task to complete.
33+
*/
34+
void waitIfRunning(long timeout, TimeUnit timeUnit);
35+
36+
/**
37+
* The builder for a ScheduledTask.
38+
*/
39+
interface Builder {
40+
41+
/**
42+
* Specify the schedule to run the task.
43+
* @param initial The initial delay
44+
* @param delay The delay between task execution
45+
* @param timeUnit The timeunit of the scheduled delay
46+
*/
47+
Builder schedule(int initial, int delay, TimeUnit timeUnit);
48+
49+
/**
50+
* Specify the task to execute periodically according to the schedule.
51+
*/
52+
Builder task(Runnable task);
53+
54+
/**
55+
* Build the scheduled task.
56+
*/
57+
ScheduledTask build();
58+
}
59+
}

metrics/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
exports io.avaje.metrics.spi;
99
exports io.avaje.metrics.annotation;
1010

11+
requires transitive io.avaje.applog;
1112
requires static java.management;
1213

1314
uses io.avaje.metrics.spi.SpiMetricBuilder;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.avaje.metrics;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
class ScheduledTaskTest {
11+
12+
@Test
13+
void runIt() throws InterruptedException {
14+
15+
ScheduledTask task = ScheduledTask.builder()
16+
.schedule(1, 1, TimeUnit.MILLISECONDS)
17+
.task(ScheduledTaskTest::hello)
18+
.build();
19+
20+
assertThat(counter.get()).isEqualTo(0);
21+
System.out.println("start...");
22+
task.start();
23+
Thread.sleep(5);
24+
assertThat(counter.get()).isGreaterThan(0);
25+
26+
task.waitIfRunning(10, TimeUnit.SECONDS);
27+
Thread.sleep(30);
28+
29+
System.out.println("cancel...");
30+
task.cancel(false);
31+
long after0 = counter.get();
32+
assertThat(after0).isGreaterThan(1);
33+
Thread.sleep(10);
34+
long after1 = counter.get();
35+
assertThat(after1).isEqualTo(after0);
36+
System.out.println("done");
37+
}
38+
39+
private static final AtomicLong counter = new AtomicLong();
40+
41+
private static void hello() {
42+
System.out.println("hi " + counter.incrementAndGet());
43+
}
44+
}

0 commit comments

Comments
 (0)