Skip to content

Commit 24bbb7b

Browse files
fixed:fix the issue of Redis 'SET key value nx ex 10' format parsing failure
1 parent 2ac5485 commit 24bbb7b

File tree

4 files changed

+73
-12
lines changed

4 files changed

+73
-12
lines changed

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,17 @@
111111
</dependency>
112112

113113
<!-- SQLite 驱动 -->
114+
<!-- <dependency>-->
115+
<!-- <groupId>org.xerial</groupId>-->
116+
<!-- <artifactId>sqlite-jdbc</artifactId>-->
117+
<!-- <version>3.21.0.1</version>-->
118+
<!-- </dependency>-->
114119
<dependency>
115120
<groupId>org.xerial</groupId>
116121
<artifactId>sqlite-jdbc</artifactId>
117122
<version>3.21.0.1</version>
123+
<version>3.39.3.0</version>
118124
</dependency>
119-
120125
<dependency>
121126
<groupId>org.mybatis.spring.boot</groupId>
122127
<artifactId>mybatis-spring-boot-starter</artifactId>

syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -672,19 +672,54 @@ public Object send(byte[] cmd, byte[]... args) {
672672
if (Strings.byteToString(cmd).toUpperCase().indexOf("SET") >= 0 || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("RESTORE") || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("RESTOREREPLACE") || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("DEL")) {
673673
cleanData(Strings.byteToString(args[0]));
674674
}
675+
676+
675677
if (isSetNxWithTime(cmd, args)) {
676678
String byte3 = Strings.byteToString(args[3]);
679+
String byte4 = Strings.byteToString(args[4]);
680+
try {
681+
long setExData=Long.parseLong(byte3);
682+
}catch (NumberFormatException e){
683+
byte3="ex";
684+
}
677685

678-
kvPersistence.addKey(EventEntity
679-
.builder()
680-
.key(args[0])
681-
.value(args[1])
682-
.stringKey(Strings.byteToString(args[0]))
683-
.pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME)
684-
.dbNum(Long.valueOf(currentDbNum))
685-
.cmd("SET".getBytes())
686-
.ms(Long.parseLong(byte3))
687-
.build());
686+
if("ex".equalsIgnoreCase(byte3)||"nx".equalsIgnoreCase(byte3)||"xx".equalsIgnoreCase(byte3)||"px".equalsIgnoreCase(byte3)){
687+
kvPersistence.addKey(EventEntity
688+
.builder()
689+
.key(args[0])
690+
.value(args[1])
691+
.stringKey(Strings.byteToString(args[0]))
692+
.pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME)
693+
.dbNum(Long.valueOf(currentDbNum))
694+
.cmd("SET".getBytes())
695+
.ms(Long.parseLong(byte4))
696+
.build());
697+
}else {
698+
kvPersistence.addKey(EventEntity
699+
.builder()
700+
.key(args[0])
701+
.value(args[1])
702+
.stringKey(Strings.byteToString(args[0]))
703+
.pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME)
704+
.dbNum(Long.valueOf(currentDbNum))
705+
.cmd("SET".getBytes())
706+
.ms(Long.parseLong(byte3))
707+
.build());
708+
}
709+
710+
// if (isSetNxWithTime(cmd, args)) {
711+
// String byte3 = Strings.byteToString(args[3]);
712+
//
713+
// kvPersistence.addKey(EventEntity
714+
// .builder()
715+
// .key(args[0])
716+
// .value(args[1])
717+
// .stringKey(Strings.byteToString(args[0]))
718+
// .pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME)
719+
// .dbNum(Long.valueOf(currentDbNum))
720+
// .cmd("SET".getBytes())
721+
// .ms(Long.parseLong(byte3))
722+
// .build());
688723
addCommandNum();
689724
} else {
690725
if (args == null || args.length <= 0) {

syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,27 @@ public void run(KeyValueEventEntity keyValueEventEntity){
158158
SingleTaskDataManagerUtils.brokenStatusAndLog("被抛弃key数量到达阈值[" + errorCount + "],exception reason["+e.getMessage()+"]", this.getClass(), taskId);
159159
}
160160
}
161-
log.error("[{}]抛弃key:{} ,class:[{}]:原因[{}]",taskId, keyName,event.getClass().toString(),e.getMessage());
161+
String stringCommand=command;
162+
if(event instanceof DefaultCommand){
163+
try {
164+
DefaultCommand defaultCommand= (DefaultCommand) event;
165+
command= Strings.byteToString(defaultCommand.getCommand());
166+
String [] args=Strings.byteToString(defaultCommand.getArgs());
167+
StringBuilder commands=new StringBuilder();
168+
commands.append(" ").append(command);
169+
for (int i=0;i<args.length;i++){
170+
String key=args[i];
171+
commands.append(" ").append(key);
172+
}
173+
stringCommand=commands.toString();
174+
}catch (Exception exq){
175+
log.error("error command log error");
176+
}
177+
178+
}
179+
log.error("[{}]抛弃 command:[{}] key:{} ,class:[{}]:原因[{}] ",taskId,stringCommand, keyName,event.getClass().toString(),e.getMessage());
180+
181+
// log.error("[{}]抛弃key:{} ,class:[{}]:原因[{}]",taskId, keyName,event.getClass().toString(),e.getMessage());
162182
DataCleanUtils.cleanData(keyValueEventEntity,event);
163183
e.printStackTrace();
164184
}finally {

syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingAofCommandSendStrategy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo
120120
if(eventEntity.getEvent() instanceof DefaultCommand){
121121
DefaultCommand dc = (DefaultCommand) eventEntity.getEvent();
122122
}
123+
log.error("AofCommandSendStrategy error {} {}",e.getMessage(),e.getStackTrace());
123124
throw new StartegyNodeException(e.getMessage()+"->AofCommandSendStrategy",e.getCause());
124125
}
125126
}

0 commit comments

Comments
 (0)