Skip to content

Commit 2342e67

Browse files
committed
RedisBuffer
1 parent 5662882 commit 2342e67

File tree

6 files changed

+350
-15
lines changed

6 files changed

+350
-15
lines changed
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
#pragma once
2+
#include "AudioTools/CoreAudio/Buffers.h"
3+
4+
namespace audio_tools {
5+
6+
/**
7+
* @brief Buffer implementation that stores and retrieves data from a Redis server using Arduino Client.
8+
*
9+
* This buffer uses a Redis list as a circular buffer and batches read/write operations for efficiency.
10+
* Individual write/read calls are buffered locally using SingleBuffer and only sent to Redis in bulk
11+
* when writeArray/readArray is called or when the buffer is full/empty. This reduces network overhead
12+
* and improves performance for streaming scenarios.
13+
*
14+
* - Uses RPUSH for writing and LRANGE/LTRIM for reading from Redis.
15+
* - All Redis commands are constructed using the RESP protocol and sent via the Arduino Client API.
16+
* - The buffer size for local batching can be configured via the constructor.
17+
* - Supports automatic expiration of the Redis key after a specified number of seconds.
18+
*
19+
* @tparam T Data type to buffer (e.g., uint8_t, int16_t)
20+
* @ingroup buffers
21+
*/
22+
template <typename T>
23+
class RedisBuffer : public BaseBuffer<T> {
24+
public:
25+
/**
26+
* @brief Constructs a RedisBuffer.
27+
* @param client Reference to a connected Arduino Client (e.g., WiFiClient, EthernetClient).
28+
* @param key Redis key to use for the buffer (list).
29+
* @param max_size Maximum number of elements in the buffer.
30+
* @param local_buf_size Size of the local buffer for batching (default: 32).
31+
* @param expire_seconds Number of seconds after which the Redis key should expire (0 = no expiration).
32+
*/
33+
RedisBuffer(Client& client, const String& key, size_t max_size, size_t local_buf_size = 32, int expire_seconds = 0)
34+
: client(client), key(key), max_size(max_size), local_buf_size(local_buf_size), expire_seconds(expire_seconds),
35+
write_buf(local_buf_size), read_buf(local_buf_size) {}
36+
37+
/**
38+
* @brief Sets the expiration time (in seconds) for the Redis key.
39+
* The expiration will be refreshed on every write/flush.
40+
* @param seconds Expiration time in seconds (0 = no expiration).
41+
*/
42+
void setExpire(int seconds) { expire_seconds = seconds; }
43+
44+
/**
45+
* @brief Buffers a single value for writing to Redis.
46+
* Data is only sent to Redis when the local buffer is full or writeArray is called.
47+
* @param data Value to write.
48+
* @return true if buffered successfully.
49+
*/
50+
bool write(T data) override {
51+
write_buf.write(data);
52+
if (write_buf.isFull()) {
53+
flushWrite();
54+
}
55+
return true;
56+
}
57+
58+
/**
59+
* @brief Writes multiple values to Redis in one batch.
60+
* Flushes any pending writes before sending the new data.
61+
* @param data Array of values to write.
62+
* @param len Number of values to write.
63+
* @return Number of values written.
64+
*/
65+
int writeArray(const T data[], int len) override {
66+
flushWrite(); // flush any pending writes first
67+
int written = 0;
68+
for (int i = 0; i < len; ++i) {
69+
write_buf.write(data[i]);
70+
if (write_buf.isFull()) {
71+
flushWrite();
72+
}
73+
++written;
74+
}
75+
flushWrite(); // flush remaining
76+
return written;
77+
}
78+
79+
/**
80+
* @brief Reads a single value from the buffer.
81+
* If the local read buffer is empty, fetches a batch from Redis.
82+
* Flushes any pending writes before reading.
83+
* @param result Reference to store the read value.
84+
* @return true if a value was read, false otherwise.
85+
*/
86+
bool read(T& result) override {
87+
if (read_buf.isEmpty()) {
88+
flushWrite(); // flush any pending writes before reading
89+
fillReadBuffer();
90+
}
91+
if (read_buf.isEmpty()) return false;
92+
read_buf.read(result);
93+
return true;
94+
}
95+
96+
/**
97+
* @brief Reads multiple values from the buffer in one batch.
98+
* Flushes any pending writes before reading.
99+
* @param data Array to store the read values.
100+
* @param len Maximum number of values to read.
101+
* @return Number of values actually read.
102+
*/
103+
int readArray(T data[], int len) override {
104+
flushWrite(); // flush any pending writes before reading
105+
int read_count = 0;
106+
while (read_count < len) {
107+
if (read_buf.isEmpty()) {
108+
fillReadBuffer();
109+
if (read_buf.isEmpty()) break; // nothing left in Redis
110+
}
111+
read_buf.read(data[read_count++]);
112+
}
113+
return read_count;
114+
}
115+
116+
/**
117+
* @brief Peeks at the next value in the buffer without removing it.
118+
* If the local read buffer is empty, fetches a batch from Redis.
119+
* Flushes any pending writes before peeking.
120+
* @param result Reference to store the peeked value.
121+
* @return true if a value was available, false otherwise.
122+
*/
123+
bool peek(T& result) override {
124+
if (read_buf.isEmpty()) {
125+
flushWrite();
126+
fillReadBuffer();
127+
}
128+
if (read_buf.isEmpty()) return false;
129+
return read_buf.peek(result);
130+
}
131+
132+
/**
133+
* @brief Clears the buffer both locally and on the Redis server.
134+
* Flushes any pending writes before clearing.
135+
*/
136+
void reset() override {
137+
flushWrite();
138+
String cmd = redisCommand("DEL", key);
139+
sendCommand(cmd);
140+
read_buf.reset();
141+
write_buf.reset();
142+
}
143+
144+
/**
145+
* @brief Returns the number of elements available to read (local + Redis).
146+
* Flushes any pending writes before checking.
147+
* @return Number of available elements.
148+
*/
149+
int available() override {
150+
flushWrite();
151+
String cmd = redisCommand("LLEN", key);
152+
if (!sendCommand(cmd)) return 0;
153+
String resp = readResponse();
154+
return resp.toInt() + read_buf.available();
155+
}
156+
157+
/**
158+
* @brief Returns the number of elements that can be written before reaching max_size.
159+
* @return Number of available slots for writing.
160+
*/
161+
int availableForWrite() override {
162+
return max_size - available();
163+
}
164+
165+
/**
166+
* @brief Returns the address of the start of the physical read buffer (not supported).
167+
* @return nullptr.
168+
*/
169+
T* address() override { return nullptr; }
170+
171+
/**
172+
* @brief Returns the maximum capacity of the buffer.
173+
* @return Maximum number of elements.
174+
*/
175+
size_t size() override { return max_size; }
176+
177+
/**
178+
* @brief Resizes the maximum buffer size.
179+
* @param size New maximum size.
180+
* @return true if resized.
181+
*/
182+
bool resize(int size) override {
183+
max_size = size;
184+
return true;
185+
}
186+
187+
protected:
188+
Client& client; ///< Reference to the Arduino Client for Redis communication.
189+
String key; ///< Redis key for the buffer.
190+
size_t max_size; ///< Maximum number of elements in the buffer.
191+
size_t local_buf_size; ///< Local buffer size for batching.
192+
int expire_seconds = 0; ///< Expiration time in seconds (0 = no expiration).
193+
SingleBuffer<T> write_buf; ///< Local buffer for pending writes.
194+
SingleBuffer<T> read_buf; ///< Local buffer for pending reads.
195+
196+
/**
197+
* @brief Constructs a Redis command in RESP format.
198+
* @param cmd Redis command (e.g., "RPUSH").
199+
* @param arg1 First argument.
200+
* @param arg2 Second argument.
201+
* @param arg3 Third argument.
202+
* @return Command string in RESP format.
203+
*/
204+
String redisCommand(const String& cmd, const String& arg1 = "", const String& arg2 = "", const String& arg3 = "") {
205+
String out = "*" + String(1 + (arg1.length() > 0) + (arg2.length() > 0) + (arg3.length() > 0)) + "\r\n";
206+
out += "$" + String(cmd.length()) + "\r\n" + cmd + "\r\n";
207+
if (arg1.length()) out += "$" + String(arg1.length()) + "\r\n" + arg1 + "\r\n";
208+
if (arg2.length()) out += "$" + String(arg2.length()) + "\r\n" + arg2 + "\r\n";
209+
if (arg3.length()) out += "$" + String(arg3.length()) + "\r\n" + arg3 + "\r\n";
210+
return out;
211+
}
212+
213+
/**
214+
* @brief Sends a command to the Redis server.
215+
* @param cmd Command string in RESP format.
216+
* @return true if sent successfully.
217+
*/
218+
bool sendCommand(const String& cmd) {
219+
client.print(cmd);
220+
client.flush();
221+
return true;
222+
}
223+
224+
/**
225+
* @brief Reads a single line response from the Redis server.
226+
* @return Response string.
227+
*/
228+
String readResponse() {
229+
String line = "";
230+
unsigned long start = millis();
231+
while (client.connected() && (millis() - start < 1000)) {
232+
if (client.available()) {
233+
char c = client.read();
234+
if (c == '\r') continue;
235+
if (c == '\n') break;
236+
line += c;
237+
}
238+
}
239+
// Remove RESP prefix if present
240+
if (line.length() && (line[0] == ':' || line[0] == '$' || line[0] == '+')) {
241+
int idx = 1;
242+
while (idx < line.length() && (line[idx] < '0' || line[idx] > '9')) ++idx;
243+
return line.substring(idx);
244+
}
245+
return line;
246+
}
247+
248+
/**
249+
* @brief Flushes buffered writes to Redis using RPUSH and sets expiration if configured.
250+
*/
251+
void flushWrite() {
252+
if (write_buf.isEmpty()) return;
253+
// Use RPUSH with multiple arguments
254+
String cmd = "*" + String(2 + write_buf.available()) + "\r\n";
255+
cmd += "$5\r\nRPUSH\r\n";
256+
cmd += "$" + String(key.length()) + "\r\n" + key + "\r\n";
257+
T value;
258+
for (int i = 0; i < write_buf.available(); ++i) {
259+
write_buf.peek(value); // always peeks the first
260+
String sval = String(value);
261+
cmd += "$" + String(sval.length()) + "\r\n" + sval + "\r\n";
262+
write_buf.read(value); // remove after sending
263+
}
264+
sendCommand(cmd);
265+
266+
// Set expiration if needed
267+
if (expire_seconds > 0) {
268+
String expireCmd = redisCommand("EXPIRE", key, String(expire_seconds));
269+
sendCommand(expireCmd);
270+
}
271+
}
272+
273+
/**
274+
* @brief Fills the local read buffer from Redis using LRANGE.
275+
* After reading, removes the items from Redis using LTRIM.
276+
*/
277+
void fillReadBuffer() {
278+
// Read up to local_buf_size items from Redis
279+
String cmd = redisCommand("LRANGE", key, "0", String(local_buf_size - 1));
280+
if (!sendCommand(cmd)) return;
281+
// Parse RESP array
282+
int count = 0;
283+
String line;
284+
while (client.connected() && count < local_buf_size) {
285+
line = "";
286+
unsigned long start = millis();
287+
while (client.connected() && (millis() - start < 1000)) {
288+
if (client.available()) {
289+
char c = client.read();
290+
if (c == '\r') continue;
291+
if (c == '\n') break;
292+
line += c;
293+
}
294+
}
295+
if (line.length() == 0) break;
296+
if (line[0] == '$') {
297+
int len = line.substring(1).toInt();
298+
if (len <= 0) break;
299+
String value = "";
300+
while (value.length() < len) {
301+
if (client.available()) value += (char)client.read();
302+
}
303+
client.read(); // \r
304+
client.read(); // \n
305+
read_buf.write((T)value.toInt());
306+
++count;
307+
}
308+
}
309+
// Remove the read items from Redis
310+
if (count > 0) {
311+
String ltrimCmd = redisCommand("LTRIM", key, String(count), "-1");
312+
sendCommand(ltrimCmd);
313+
}
314+
}
315+
};
316+
317+
}

src/AudioTools/Concurrency/RTOS/SynchronizedNBufferRTOS.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class SynchronizedNBufferRTOST : public NBuffer<T> {
2424
cleanup();
2525
}
2626

27-
void resize(int bufferSize, int bufferCount) {
27+
bool resize(int bufferSize, int bufferCount) {
2828
TRACED();
2929
if (buffer_size == bufferSize && buffer_count == bufferCount){
30-
return;
30+
return true;
3131
}
3232

3333
max_size = bufferSize * bufferCount;
@@ -48,8 +48,10 @@ class SynchronizedNBufferRTOST : public NBuffer<T> {
4848
available_buffers.enqueue(tmp);
4949
} else {
5050
LOGE("Not Enough Memory for buffer %d", j);
51+
return false;
5152
}
5253
}
54+
return true;
5355
}
5456

5557
void setReadMaxWait(TickType_t ticks){

src/AudioTools/CoreAudio/AudioEffects/PitchShift.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ class VariableSpeedRingBufferSimple : public BaseBuffer<T> {
4141

4242
void setIncrement(float increment) { read_increment = increment; }
4343

44-
void resize(int size) {
44+
bool resize(int size) {
4545
buffer_size = size;
46-
buffer.resize(size);
46+
return buffer.resize(size);
4747
}
4848

4949
bool read(T &result) {
@@ -117,10 +117,10 @@ class VariableSpeedRingBuffer180 : public BaseBuffer<T> {
117117

118118
void setIncrement(float increment) { pitch_shift = increment; }
119119

120-
void resize(int size) {
120+
bool resize(int size) {
121121
buffer_size = size;
122122
overlap = buffer_size / 10;
123-
buffer.resize(size);
123+
return buffer.resize(size);
124124
}
125125

126126
bool read(T &result) {
@@ -235,11 +235,11 @@ class VariableSpeedRingBuffer : public BaseBuffer<T> {
235235

236236
void setIncrement(float increment) { read_increment = increment; }
237237

238-
void resize(int size) {
238+
bool resize(int size) {
239239
buffer_size = size;
240240
// prevent an overrun at the start
241241
read_pos_float = size / 2;
242-
buffer.resize(size);
242+
return buffer.resize(size);
243243
}
244244

245245
bool read(T &result) {

src/AudioTools/CoreAudio/AudioHttp/URLStreamESP32.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,10 @@ class URLStreamESP32 : public AbstractURLStream {
416416

417417
/// ICYStream
418418
using ICYStreamESP32 = ICYStreamT<URLStreamESP32>;
419+
#if defined(USE_CONCURRENCY)
419420
using URLStreamBufferedESP32 = URLStreamBufferedT<URLStreamESP32>;
420421
using ICYStreamBufferedESP32 = URLStreamBufferedT<ICYStreamESP32>;
422+
#endif
421423

422424
/// Support URLStream w/o Arduino
423425
#if !defined(ARDUINO)

0 commit comments

Comments
 (0)