-
Notifications
You must be signed in to change notification settings - Fork 86
persistent logger - check fd before write in append() (dfs 2624) #9066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -8,6 +8,7 @@ const P = require('./promise'); | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const semaphore = require('./semaphore'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const { NewlineReader } = require('./file_reader'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const dbg = require('./debug_module')(__filename); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const APPEND_ATTEMPTS_LIMIT = 5; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* PersistentLogger is a logger that is used to record data onto disk separated by newlines. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -105,10 +106,20 @@ class PersistentLogger { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {string} data | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async append(data) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const fh = await this.init(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const buf = Buffer.from(data + '\n', 'utf8'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await fh.write(this.fs_context, buf, buf.length); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const fh = await this.init(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//if another process has deleted the active file, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//this process' _poll_active_file_change might have closed the fd | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//in that case fd is -1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//in order to avoid inter-process locking, we just re-init | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//the fd to the new active file. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (fh.fd === -1) continue; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await fh.write(this.fs_context, buf, buf.length); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.local_size += buf.length; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+111
to
124
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Retry loop still loses data & can throw unhandled
A minimal, self-contained fix: const buf = Buffer.from(data + '\n', 'utf8');
- for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) {
- const fh = await this.init();
- //if another process has deleted the active file,
- //this process' _poll_active_file_change might have closed the fd
- //in that case fd is -1
- //in order to avoid inter-process locking, we just re-init
- //the fd to the new active file.
- if (fh.fd === -1) continue;
- await fh.write(this.fs_context, buf, buf.length);
- break;
- }
-
- this.local_size += buf.length;
+ let written = false;
+ for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) {
+ const fh = await this.init();
+
+ if (fh.fd === -1) { // handle already closed
+ await P.delay(1); // avoid busy-loop
+ continue;
+ }
+
+ try {
+ await fh.write(this.fs_context, buf, buf.length);
+ written = true;
+ break;
+ } catch (err) {
+ if (err.code !== 'EBADF') throw err; // real IO error
+ // someone closed the fd after we checked – retry
+ await this.close();
+ }
+ }
+
+ if (!written) {
+ throw new Error(`append failed after ${APPEND_ATTEMPTS_LIMIT} attempts`);
+ }
+
+ this.local_size += buf.length; This guarantees either: 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other process should not be able to delete the file if there is >= 1 process that exists as a writer. The reason is that the delete would happen only after
_process
would finish processing the active log file and in order to finish this thecollect_and_process
must have finished which must acquire anEXCLUSIVE
lock before doing so.