Skip to content

Commit 9cdefc1

Browse files
committed
Redis buffered read
1 parent 0a3eb3b commit 9cdefc1

File tree

1 file changed

+82
-79
lines changed

1 file changed

+82
-79
lines changed

src/AudioTools/Communication/RedisBuffer.h

Lines changed: 82 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,19 @@ class RedisBuffer : public BaseBuffer<T> {
9595
*/
9696
bool read(T& result) override {
9797
flushWrite(); // flush any pending writes before reading
98-
99-
// Use LPOP to read a single value directly from Redis
100-
String cmd = redisCommand("LPOP", key);
101-
int val = sendCommand(cmd);
102-
if (val == 0) return false;
103-
result = (T)val;
98+
if (read_buf.isEmpty()) {
99+
read_buf.reset();
100+
fillReadBuffer(); // fill local buffer from Redis
101+
if (read_buf.isEmpty()) {
102+
LOGI("RedisBuffer:read: no data available");
103+
return false; // nothing left in Redis
104+
}
105+
}
106+
T val;
107+
bool rc = read_buf.read(val); // read from local buffer
104108
LOGI("Redis LPOP: %d", val);
105-
return true;
109+
result = val;
110+
return rc;
106111
}
107112

108113
/**
@@ -136,9 +141,9 @@ class RedisBuffer : public BaseBuffer<T> {
136141

137142
// Use LINDEX to peek at the first value in Redis without removing it
138143
String cmd = redisCommand("LINDEX", key, "0");
139-
int val = sendCommand(cmd);
140-
if (val == 0) return false;
141-
result = (T)val;
144+
RedisResult resp = sendCommand(cmd);
145+
if (!resp.ok) return false;
146+
result = (T)resp.intValue;
142147
return true;
143148
}
144149

@@ -149,8 +154,8 @@ class RedisBuffer : public BaseBuffer<T> {
149154
void reset() override {
150155
flushWrite();
151156
String cmd = redisCommand("DEL", key);
152-
int rc = sendCommand(cmd);
153-
LOGI("Redis DEL: %d", rc);
157+
auto rc = sendCommand(cmd);
158+
LOGI("Redis DEL: %d", rc.intValue);
154159
read_buf.reset();
155160
write_buf.reset();
156161
}
@@ -163,9 +168,9 @@ class RedisBuffer : public BaseBuffer<T> {
163168
int available() override {
164169
flushWrite();
165170
String cmd = redisCommand("LLEN", key);
166-
int val = sendCommand(cmd);
167-
LOGI("LLEN: %d", val);
168-
return val + read_buf.available();
171+
RedisResult resp = sendCommand(cmd);
172+
LOGI("LLEN: %d (ok=%d)", resp.intValue, resp.ok);
173+
return resp.intValue + read_buf.available();
169174
}
170175

171176
/**
@@ -203,6 +208,15 @@ class RedisBuffer : public BaseBuffer<T> {
203208
}
204209

205210
protected:
211+
struct RedisResult {
212+
int intValue = 0; ///< Integer value parsed from the response (if any)
213+
Vector<String>
214+
strValues; ///< String value parsed from the response (if any)
215+
bool ok = false; ///< True if the response was valid and not an error
216+
operator bool() const { return ok; } ///< Implicit conversion to bool
217+
// std::vector<T> values;
218+
};
219+
206220
Client& client; ///< Reference to the Arduino Client for Redis communication.
207221
const char* key; ///< Redis key for the buffer.
208222
size_t max_size; ///< Maximum number of elements in the buffer.
@@ -243,42 +257,56 @@ class RedisBuffer : public BaseBuffer<T> {
243257
}
244258

245259
/**
246-
* @brief Sends a command to the Redis server and returns the integer
247-
* response.
260+
* @brief Sends a command to the Redis server and returns the parsed result.
248261
* @param cmd Command string in RESP format.
249-
* @return Integer value from the Redis response, or -1 on error.
262+
* @return RedisResult structure with parsed response.
250263
*/
251-
int sendCommand(const String& cmd) {
264+
RedisResult sendCommand(const String& cmd) {
265+
RedisResult result;
252266
if (!client.connected()) {
253267
LOGE("Redis not connected");
254-
return -1;
268+
result.ok = false;
269+
return result;
255270
}
256271
client.print(cmd);
257272
client.flush();
258-
return readResponse();
273+
result = readResponse();
274+
return result;
259275
}
260276

261277
/**
262-
* @brief Reads a single line response from the Redis server and returns it as
263-
* an integer.
264-
* @return Response as int. Returns 0 if no valid integer is found.
278+
* @brief Reads a single line response from the Redis server and parses it
279+
* into a RedisResult.
280+
* @return RedisResult structure with parsed response.
265281
*/
266-
int readResponse() {
267-
uint8_t buffer[128] = {};
282+
RedisResult readResponse() {
283+
RedisResult result;
284+
uint8_t buffer[1024] = {};
268285
int n = 0;
269-
while (n <= 0 ) {
286+
while (n <= 0) {
270287
n = client.read(buffer, sizeof(buffer));
271288
}
272289
buffer[n] = 0;
273290

274-
// Serial.println("----");
275-
// Serial.println((char*)buffer);
276-
// Serial.println("----");
291+
// build vector of strings
292+
result.strValues.clear();
293+
String tail = (char*)buffer;
294+
int nl_pos = tail.indexOf("\r\n");
295+
while (nl_pos >= 0) {
296+
String head = tail.substring(0, nl_pos);
297+
result.strValues.push_back(head);
298+
tail = tail.substring(nl_pos + 2);
299+
tail.trim();
300+
nl_pos = tail.indexOf("\r\n");
301+
}
302+
303+
if (tail.length() > 0) {
304+
result.strValues.push_back(tail);
305+
}
277306

307+
// Get int value
278308
StrView line((char*)buffer, sizeof(buffer), n);
279309

280-
// We get 2 lines for commands like LPOP, so we need to skip the first
281-
// line
282310
if (line.startsWith("$")) {
283311
int end = line.indexOf("\n");
284312
line.substring(line.c_str(), end, line.length());
@@ -289,13 +317,12 @@ class RedisBuffer : public BaseBuffer<T> {
289317
line.replace(":", "");
290318
}
291319

292-
// Serial.println("----");
293-
// Serial.println(line.c_str());
294-
// Serial.println("----");
320+
if (line.isEmpty())
321+
line = -1;
322+
else
323+
result.intValue = line.toInt();
295324

296-
if (line.isEmpty()) return -1;
297-
298-
return line.toInt();
325+
return result;
299326
}
300327

301328
/**
@@ -311,19 +338,19 @@ class RedisBuffer : public BaseBuffer<T> {
311338
cmd += "$" + String(strlen(key)) + "\r\n" + key + "\r\n";
312339
T value;
313340
while (!write_buf.isEmpty()) {
314-
write_buf.read(value); // remove after sending
341+
write_buf.read(value);
315342
String sval = String(value);
316343
cmd += "$" + String(sval.length()) + "\r\n" + sval + "\r\n";
317344
}
318345
write_buf.clear();
319-
int resp = sendCommand(cmd);
320-
LOGI("Redis RPUSH %d entries: %d", write_size, resp);
346+
RedisResult resp = sendCommand(cmd);
347+
LOGI("Redis RPUSH %d entries: %d (ok=%d)", write_size, resp.intValue,
348+
resp.ok);
321349

322-
// Set expiration if needed
323350
if (expire_seconds > 0) {
324351
String expireCmd = redisCommand("EXPIRE", key, String(expire_seconds));
325-
int resp = sendCommand(expireCmd);
326-
LOGI("Redis EXPIRE: %d", resp);
352+
RedisResult resp = sendCommand(expireCmd);
353+
LOGI("Redis EXPIRE: %d (ok=%d)", resp.intValue, resp.ok);
327354
}
328355
}
329356

@@ -332,42 +359,18 @@ class RedisBuffer : public BaseBuffer<T> {
332359
* After reading, removes the items from Redis using LTRIM.
333360
*/
334361
void fillReadBuffer() {
362+
read_buf.reset();
335363
// Read up to local_buf_size items from Redis
336-
String cmd = redisCommand("LRANGE", key, "0", String(local_buf_size - 1));
337-
if (sendCommand(cmd) < 0) return;
338-
// Parse RESP array
339-
int count = 0;
340-
String line;
341-
while (client.connected() && count < local_buf_size) {
342-
line = "";
343-
unsigned long start = millis();
344-
while (client.connected() && (millis() - start < 1000)) {
345-
if (client.available()) {
346-
char c = client.read();
347-
if (c == '\r') continue;
348-
if (c == '\n') break;
349-
line += c;
350-
}
351-
}
352-
if (line.length() == 0) break;
353-
if (line[0] == '$') {
354-
int len = line.substring(1).toInt();
355-
if (len <= 0) break;
356-
String value = "";
357-
while (value.length() < len) {
358-
if (client.available()) value += (char)client.read();
359-
}
360-
client.read(); // \r
361-
client.read(); // \n
362-
read_buf.write((T)value.toInt());
363-
++count;
364-
}
365-
}
366-
// Remove the read items from Redis
367-
if (count > 0) {
368-
String ltrimCmd = redisCommand("LTRIM", key, String(count), "-1");
369-
sendCommand(ltrimCmd);
370-
}
364+
String cmd = redisCommand("LPOP", key, String(read_buf.size()));
365+
auto rc = sendCommand(cmd);
366+
for (auto &str : rc.strValues) {
367+
if (str.startsWith("*")) continue;
368+
if (str.startsWith("$")) continue;
369+
if (str.length()==0) continue;
370+
LOGI("Redis LPOP: %s", str.c_str());
371+
T value = (T)str.toInt();
372+
read_buf.write(value);
373+
}
371374
}
372375
};
373376

0 commit comments

Comments
 (0)