Skip to content

Commit 84fa6bc

Browse files
authored
chore: improve parser state machine (#4313)
* chore: improve parser state machine 1. Separate argument type parsing from argument parsing itself. 2. Handle strings of length 1. This is done in preparation of improving the parser contract - so that when it returns INPUT_PENDING, it consumes the entire input. --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent 9d6b2a1 commit 84fa6bc

File tree

3 files changed

+112
-79
lines changed

3 files changed

+112
-79
lines changed

src/facade/redis_parser.cc

Lines changed: 77 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
1818
*consumed = 0;
1919
res->clear();
2020

21-
if (str.size() < 2) {
22-
return INPUT_PENDING;
23-
}
24-
2521
if (state_ == CMD_COMPLETE_S) {
26-
InitStart(str[0], res);
27-
} else {
22+
if (InitStart(str[0], res)) {
23+
// We recognized a non-INLINE state, starting with a special char.
24+
str.remove_prefix(1);
25+
*consumed += 1;
26+
if (server_mode_ && state_ == PARSE_ARG_S) { // server requests start with ARRAY_LEN_S.
27+
state_ = CMD_COMPLETE_S; // reject and reset the state.
28+
return BAD_ARRAYLEN;
29+
}
30+
if (str.empty())
31+
return INPUT_PENDING;
32+
}
33+
} else { // INLINE mode, aka PING\n
2834
// We continue parsing in the middle.
2935
if (!cached_expr_)
3036
cached_expr_ = res;
@@ -39,12 +45,15 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
3945
case ARRAY_LEN_S:
4046
resultc = ConsumeArrayLen(str);
4147
break;
48+
case PARSE_ARG_TYPE:
49+
arg_c_ = str[0];
50+
if (server_mode_ && arg_c_ != '$') // server side only supports bulk strings.
51+
return BAD_BULKLEN;
52+
resultc.second = 1;
53+
state_ = PARSE_ARG_S;
54+
break;
4255
case PARSE_ARG_S:
43-
if (str.size() == 0 || (str.size() < 4 && str[0] != '_')) {
44-
resultc.first = INPUT_PENDING;
45-
} else {
46-
resultc = ParseArg(str);
47-
}
56+
resultc = ParseArg(str);
4857
break;
4958
case INLINE_S:
5059
DCHECK(parse_stack_.empty());
@@ -84,7 +93,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
8493
return resultc.first;
8594
}
8695

87-
void RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
96+
bool RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
8897
buf_stash_.clear();
8998
stash_.clear();
9099
cached_expr_ = res;
@@ -101,18 +110,19 @@ void RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
101110
case ',': // Resp3 DOUBLE
102111
state_ = PARSE_ARG_S;
103112
parse_stack_.emplace_back(1, cached_expr_); // expression of length 1.
104-
break;
113+
arg_c_ = prefix_b;
114+
return true;
105115
case '*':
106116
case '~': // Resp3 SET
107117
state_ = ARRAY_LEN_S;
108-
break;
118+
return true;
109119
case '%': // Resp3 MAP
110120
state_ = MAP_LEN_S;
111-
break;
112-
default:
113-
state_ = INLINE_S;
114-
break;
121+
return true;
115122
}
123+
124+
state_ = INLINE_S;
125+
return false;
116126
}
117127

118128
void RedisParser::StashState(RespExpr::Vec* res) {
@@ -160,62 +170,66 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
160170
uint8_t* end = str.end();
161171
uint8_t* token_start = ptr;
162172

163-
auto find_token_end = [&] {
173+
auto find_token_end = [](uint8_t* ptr, uint8_t* end) {
164174
while (ptr != end && *ptr > 32)
165175
++ptr;
176+
return ptr;
166177
};
167178

168179
if (is_broken_token_) {
169-
find_token_end();
180+
ptr = find_token_end(ptr, end);
170181
size_t len = ptr - token_start;
171182

172183
ExtendLastString(Buffer(token_start, len));
173-
if (ptr != end) {
174-
is_broken_token_ = false;
184+
if (ptr == end) {
185+
return {INPUT_PENDING, ptr - token_start};
175186
}
187+
is_broken_token_ = false;
176188
}
177189

178-
auto is_finish = [&] { return ptr == end || *ptr == '\n'; };
190+
while (ptr != end) {
191+
// For inline input we only require \n.
192+
if (*ptr == '\n') {
193+
if (cached_expr_->empty()) {
194+
++ptr;
195+
continue; // skip empty line
196+
}
197+
break;
198+
}
179199

180-
while (true) {
181-
while (!is_finish() && *ptr <= 32) {
200+
if (*ptr <= 32) { // skip ws/control chars
182201
++ptr;
202+
continue;
183203
}
184-
// We do not test for \r in order to accept 'nc' input.
185-
if (is_finish())
186-
break;
187204

205+
// token start
188206
DCHECK(!is_broken_token_);
189207

190208
token_start = ptr;
191-
find_token_end();
209+
ptr = find_token_end(ptr, end);
192210

193211
cached_expr_->emplace_back(RespExpr::STRING);
194212
cached_expr_->back().u = Buffer{token_start, size_t(ptr - token_start)};
195213
}
196214

197215
uint32_t last_consumed = ptr - str.data();
198-
if (ptr == end) { // we have not finished parsing.
199-
if (ptr[-1] > 32) {
200-
// we stopped in the middle of the token.
201-
is_broken_token_ = true;
202-
}
203-
216+
if (ptr == end) { // we have not finished parsing.
217+
is_broken_token_ = ptr[-1] > 32; // we stopped in the middle of the token.
204218
return {INPUT_PENDING, last_consumed};
205219
}
206220

207-
++last_consumed; // consume the delimiter as well.
221+
DCHECK_EQ('\n', *ptr);
222+
223+
++last_consumed; // consume \n as well.
208224
state_ = CMD_COMPLETE_S;
209225

210226
return {OK, last_consumed};
211227
}
212228

213-
// Parse lines like:'$5\r\n' or '*2\r\n'
229+
// Parse lines like:'$5\r\n' or '*2\r\n'. The first character is already consumed by the caller.
214230
auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
215231
DCHECK(!str.empty());
216232

217-
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');
218-
219233
const char* s = reinterpret_cast<const char*>(str.data());
220234
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
221235
if (!pos) {
@@ -227,15 +241,15 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
227241
return {r, 0};
228242
}
229243

244+
unsigned consumed = pos - s + 1;
230245
if (pos[-1] != '\r') {
231-
return {BAD_ARRAYLEN, 0};
246+
return {BAD_ARRAYLEN, consumed};
232247
}
233248

234249
// Skip the first character and 2 last ones (\r\n).
235-
string_view len_token{s + 1, size_t(pos - 1 - s)};
250+
string_view len_token{s, size_t(pos - 1 - s)};
236251
bool success = absl::SimpleAtoi(len_token, res);
237252

238-
unsigned consumed = pos - s + 1;
239253
if (success && *res >= -1) {
240254
return ResultConsumed{OK, consumed};
241255
}
@@ -268,11 +282,12 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
268282
return {BAD_STRING, res.second};
269283

270284
if (len <= 0) {
271-
cached_expr_->emplace_back(len == -1 ? RespExpr::NIL_ARRAY : RespExpr::ARRAY);
272-
if (len < 0)
285+
if (len < 0) {
286+
cached_expr_->emplace_back(RespExpr::NIL_ARRAY);
273287
cached_expr_->back().u.emplace<RespVec*>(nullptr); // nil
274-
else {
288+
} else {
275289
static RespVec empty_vec;
290+
cached_expr_->emplace_back(RespExpr::ARRAY);
276291
cached_expr_->back().u = &empty_vec;
277292
}
278293
if (parse_stack_.empty()) {
@@ -293,9 +308,8 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
293308
arr->reserve(len);
294309
cached_expr_->back().u = arr;
295310
cached_expr_ = arr;
296-
} else {
297-
state_ = PARSE_ARG_S;
298311
}
312+
state_ = PARSE_ARG_TYPE;
299313

300314
DVLOG(1) << "PushStack: (" << len << ", " << cached_expr_ << ")";
301315
parse_stack_.emplace_back(len, cached_expr_);
@@ -306,14 +320,13 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
306320
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
307321
DCHECK(!str.empty());
308322

309-
char c = str[0];
310-
unsigned min_len = 3 + int(c != '_');
323+
unsigned min_len = 2 + int(arg_c_ != '_');
311324

312325
if (str.size() < min_len) {
313326
return {INPUT_PENDING, 0};
314327
}
315328

316-
if (c == '$') {
329+
if (arg_c_ == '$') {
317330
int64_t len;
318331

319332
ResultConsumed res = ParseLen(str, &len);
@@ -338,33 +351,27 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
338351
return {OK, res.second};
339352
}
340353

341-
if (server_mode_) {
342-
return {BAD_BULKLEN, 0};
343-
}
344-
345-
if (c == '_') { // Resp3 NIL
346-
// '_','\r','\n'
347-
DCHECK_GE(str.size(), 3u);
348-
349-
unsigned consumed = 3;
350-
if (str[1] != '\r' || str[2] != '\n') {
354+
DCHECK(!server_mode_);
355+
if (arg_c_ == '_') { // Resp3 NIL
356+
// '\r','\n'
357+
if (str[0] != '\r' || str[1] != '\n') {
351358
return {BAD_STRING, 0};
352359
}
353360

354361
cached_expr_->emplace_back(RespExpr::NIL);
355362
cached_expr_->back().u = Buffer{};
356363
HandleFinishArg();
357-
return {OK, consumed};
364+
return {OK, 2};
358365
}
359366

360-
if (c == '*') {
367+
if (arg_c_ == '*') {
361368
return ConsumeArrayLen(str);
362369
}
363370

364-
char* s = reinterpret_cast<char*>(str.data() + 1);
365-
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size() - 1));
371+
char* s = reinterpret_cast<char*>(str.data());
372+
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size()));
366373

367-
if (c == '+' || c == '-') { // Simple string or error.
374+
if (arg_c_ == '+' || arg_c_ == '-') { // Simple string or error.
368375
DCHECK(!server_mode_);
369376
if (!eol) {
370377
Result r = str.size() < 256 ? INPUT_PENDING : BAD_STRING;
@@ -374,9 +381,9 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
374381
if (eol[-1] != '\r')
375382
return {BAD_STRING, 0};
376383

377-
cached_expr_->emplace_back(c == '+' ? RespExpr::STRING : RespExpr::ERROR);
384+
cached_expr_->emplace_back(arg_c_ == '+' ? RespExpr::STRING : RespExpr::ERROR);
378385
cached_expr_->back().u = Buffer{reinterpret_cast<uint8_t*>(s), size_t((eol - 1) - s)};
379-
} else if (c == ':') {
386+
} else if (arg_c_ == ':') {
380387
DCHECK(!server_mode_);
381388
if (!eol) {
382389
Result r = str.size() < 32 ? INPUT_PENDING : BAD_INT;
@@ -390,7 +397,7 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
390397

391398
cached_expr_->emplace_back(RespExpr::INT64);
392399
cached_expr_->back().u = ival;
393-
} else if (c == ',') {
400+
} else if (arg_c_ == ',') {
394401
DCHECK(!server_mode_);
395402
if (!eol) {
396403
Result r = str.size() < 32 ? INPUT_PENDING : BAD_DOUBLE;
@@ -410,7 +417,7 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
410417

411418
HandleFinishArg();
412419

413-
return {OK, (eol - s) + 2};
420+
return {OK, (eol - s) + 1};
414421
}
415422

416423
auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
@@ -466,10 +473,10 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
466473
}
467474

468475
void RedisParser::HandleFinishArg() {
469-
state_ = PARSE_ARG_S;
470476
DCHECK(!parse_stack_.empty());
471477
DCHECK_GT(parse_stack_.back().first, 0u);
472478

479+
state_ = PARSE_ARG_TYPE;
473480
while (true) {
474481
--parse_stack_.back().first;
475482
if (parse_stack_.back().first != 0)

src/facade/redis_parser.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ class RedisParser {
7272
private:
7373
using ResultConsumed = std::pair<Result, uint32_t>;
7474

75-
void InitStart(char prefix_b, RespVec* res);
75+
// Returns true if this is a RESP message, false if INLINE.
76+
bool InitStart(char prefix_b, RespVec* res);
7677
void StashState(RespVec* res);
7778

7879
// Skips the first character (*).
@@ -89,15 +90,16 @@ class RedisParser {
8990
INLINE_S,
9091
ARRAY_LEN_S,
9192
MAP_LEN_S,
92-
PARSE_ARG_S, // Parse [$:+-]string\r\n
93+
PARSE_ARG_TYPE, // Parse [$:+-]
94+
PARSE_ARG_S, // Parse string\r\n
9395
BULK_STR_S,
9496
CMD_COMPLETE_S,
9597
};
9698

9799
State state_ = CMD_COMPLETE_S;
98100
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
99101
bool server_mode_ = true;
100-
102+
char arg_c_ = 0;
101103
uint32_t bulk_len_ = 0;
102104
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
103105
uint32_t max_arr_len_;

0 commit comments

Comments
 (0)