1
1
package syncer .transmission .task ;
2
2
3
3
import java .io .IOException ;
4
-
5
4
import lombok .extern .slf4j .Slf4j ;
6
5
import syncer .jedis .Protocol ;
7
6
import syncer .replica .config .RedisURI ;
21
20
import syncer .replica .util .SyncTypeUtils ;
22
21
import syncer .replica .util .TaskRunTypeEnum ;
23
22
import syncer .replica .util .strings .Strings ;
23
+ import syncer .replica .util .type .ExpiredType ;
24
24
import syncer .transmission .checkpoint .breakpoint .BreakPoint ;
25
25
import syncer .transmission .client .RedisClient ;
26
26
import syncer .transmission .client .RedisClientFactory ;
@@ -208,11 +208,25 @@ public void onEvent(Replication replicator, Event event) {
208
208
//全量命令 RESTORE
209
209
if (event instanceof DumpKeyValuePairEvent ) {
210
210
DumpKeyValuePairEvent valueDump = (DumpKeyValuePairEvent ) event ;
211
- Long ms = valueDump .getExpiredMs ();
212
211
RedisDB vdb = valueDump .getDb ();
213
212
Long dbNum = vdb .getCurrentDbNumber ();
214
213
db = dbNum .intValue ();
215
- long ttl = (ms == null || ms < 0 ) ? 0 : ms ;
214
+ long ttlMs = 0 ;
215
+
216
+ // expiryTime 是到期时间,不是剩余过期时间
217
+ if (!ExpiredType .NONE .equals (valueDump .getExpiredType ())) {
218
+ long expiryTimeMs = 0L ;
219
+ if (ExpiredType .SECOND .equals (valueDump .getExpiredType ())) {
220
+ expiryTimeMs = valueDump .getExpiredSeconds () * 1000L ;
221
+ } else if (ExpiredType .MS .equals (valueDump .getExpiredType ())) {
222
+ expiryTimeMs = valueDump .getExpiredMs ();
223
+ }
224
+ ttlMs = expiryTimeMs - System .currentTimeMillis ();
225
+ if (ttlMs <= 0 ) {
226
+ log .debug ("restore... ignore expired key [{}]" , Strings .byteToString (valueDump .getKey ()));
227
+ return ;
228
+ }
229
+ }
216
230
String rdbDumpMd5 = circle .getRdbDumpMd5 (valueDump , sourceRedisName , 3.0 );
217
231
String dumpKey = Strings .byteToString (valueDump .getKey ());
218
232
String [] data = new String [] { rdbDumpMd5 , "1" , "1" };
@@ -224,7 +238,7 @@ public void onEvent(Replication replicator, Event event) {
224
238
log .error ("restore key:[" + dumpKey + "] fail" , e );
225
239
e .printStackTrace ();
226
240
}
227
- client .restoreReplace (dbNum , valueDump .getKey (), ttl , valueDump .getValue (), true );
241
+ client .restoreReplace (dbNum , valueDump .getKey (), ttlMs , valueDump .getValue (), true );
228
242
}
229
243
230
244
// 更新offset
0 commit comments