Skip to content

Commit f22c026

Browse files
committed
Initial commit for RW lock
1 parent 44cb670 commit f22c026

File tree

7 files changed

+200
-162
lines changed

7 files changed

+200
-162
lines changed

core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import dev.keva.ioc.annotation.Component;
99
import dev.keva.protocol.resp.Command;
1010
import dev.keva.store.KevaDatabase;
11+
import dev.keva.store.lock.SpinLock;
1112

1213
import java.util.concurrent.ExecutorService;
1314
import java.util.concurrent.Executors;
@@ -71,14 +72,14 @@ public void executeExpire(byte[] key) {
7172
data[0] = "delete".getBytes();
7273
data[1] = key;
7374
Command command = Command.newInstance(data, false);
74-
Lock lock = database.getLock();
75-
lock.lock();
75+
SpinLock lock = database.getLock();
76+
lock.exclusiveLock();
7677
try {
7778
aof.write(command);
7879
database.remove(key);
7980
clearExpiration(key);
8081
} finally {
81-
lock.unlock();
82+
lock.exclusiveUnlock();
8283
}
8384
} else {
8485
database.remove(key);

core/src/main/java/dev/keva/core/command/impl/transaction/manager/TransactionContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import dev.keva.core.command.mapping.CommandMapper;
44
import dev.keva.protocol.resp.Command;
5+
import dev.keva.store.lock.SpinLock;
56
import dev.keva.util.hashbytes.BytesKey;
67
import dev.keva.util.hashbytes.BytesValue;
78
import dev.keva.protocol.resp.reply.MultiBulkReply;
@@ -42,8 +43,8 @@ public void discard() {
4243
isQueuing = false;
4344
}
4445

45-
public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedException {
46-
txLock.lock();
46+
public Reply<?> exec(ChannelHandlerContext ctx, SpinLock txLock) throws InterruptedException {
47+
txLock.exclusiveLock();
4748
try {
4849
for (val watch : watchMap.entrySet()) {
4950
val key = watch.getKey();
@@ -74,7 +75,7 @@ public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedE
7475

7576
return new MultiBulkReply(replies);
7677
} finally {
77-
txLock.unlock();
78+
txLock.exclusiveUnlock();
7879
}
7980
}
8081
}

core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,17 @@
2525
import java.lang.reflect.InvocationTargetException;
2626
import java.util.Arrays;
2727
import java.util.HashMap;
28+
import java.util.HashSet;
2829
import java.util.Map;
30+
import java.util.Set;
2931

3032
@Component
3133
@Slf4j
3234
public class CommandMapper {
35+
36+
private static final Set<String> EXCLUSIVE_COMMANDS = new HashSet<>(Arrays.asList(
37+
"exec", "expire", "expireat", "restore", "flushdb"));
38+
3339
@Getter
3440
private final Map<BytesKey, CommandWrapper> methods = new HashMap<>();
3541

@@ -98,25 +104,40 @@ public void init() {
98104

99105
try {
100106
val lock = database.getLock();
101-
lock.lock();
107+
boolean locked = false, exclusive = false, writeToAOF = isAoF && isMutate;
102108
try {
103-
if (ctx != null && isAoF && isMutate) {
104-
try {
105-
aof.write(command);
106-
} catch (Exception e) {
107-
log.error("Error writing to AOF", e);
108-
}
109-
}
110109
Object[] objects = new Object[types.length];
111110
command.toArguments(objects, types, ctx);
111+
if (ctx != null) {
112+
locked = true;
113+
if (isMutate || EXCLUSIVE_COMMANDS.contains(name)) {
114+
lock.exclusiveLock();
115+
exclusive = true;
116+
if (writeToAOF) {
117+
try {
118+
aof.write(command);
119+
} catch (Exception e) {
120+
log.error("Error writing to AOF", e);
121+
}
122+
}
123+
} else {
124+
lock.sharedLock();
125+
}
126+
}
112127
// If not in AOF mode, then recycle(),
113128
// else, the command will be recycled in the AOF dump
114129
if (!kevaConfig.getAof()) {
115130
command.recycle();
116131
}
117132
return (Reply<?>) method.invoke(instance, objects);
118133
} finally {
119-
lock.unlock();
134+
if (locked) {
135+
if (exclusive) {
136+
lock.exclusiveUnlock();
137+
} else {
138+
lock.sharedUnlock();
139+
}
140+
}
120141
}
121142
} catch (Exception e) {
122143
log.error(e.getMessage(), e);

store/src/main/java/dev/keva/store/KevaDatabase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package dev.keva.store;
22

3+
import dev.keva.store.lock.SpinLock;
34
import dev.keva.util.hashbytes.BytesKey;
45

56
import java.util.AbstractMap;
67
import java.util.concurrent.locks.Lock;
78

89
public interface KevaDatabase {
9-
Lock getLock();
10+
SpinLock getLock();
1011

1112
void flush();
1213

0 commit comments

Comments
 (0)