Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/proposer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// @ts-check

import { nodeFetch } from './curl';
import { RNode } from './rnode';

const { freeze } = Object;

/**
* @param {{ admin?: string, boot: string, read: string }} api
* @param {ReturnType<import('./rnode').RNode>} rnode
* @param {SchedulerAccess} sched
* @param {number=} period
*
* @typedef { {
* setInterval: typeof setInterval,
* clearInterval: typeof clearInterval,
* } } SchedulerAccess
*/

export function proposer(api, rnode, sched, period = 2 * 1000) {
let proposing = false;
let waiters = 0;
let pid;

return freeze({
startProposing() {
if (!api.admin) return;
const node = rnode.admin(api.admin);
waiters += 1;
if (typeof pid !== 'undefined') {
return;
}
pid = sched.setInterval(() => {
if (!proposing) {
proposing = true;
node
.propose()
.then(() => {
console.log('proposed', { waiters });
proposing = false;
})
.catch((err) => {
console.log('propose failed', { waiters, err: err.message });
proposing = false;
});
}
}, period);
},
stopProposing() {
if (waiters <= 0) {
return;
}
waiters -= 1;
sched.clearInterval(pid);
pid = undefined;
},
});
}

/**
* @param {string[]} args
* @param {{ http: typeof import('http') }} io
* @param {SchedulerAccess} sched
*/
function main(args, { http }, sched) {
const [url] = args.length ? args : ['http://localhost:40404'];
const fetch = nodeFetch({ http });
const rnode = RNode(fetch);
const node = proposer({ admin: url, boot: 'N/A', read: 'N/A' }, rnode, sched);
node.startProposing();
}

if (require.main === module) {
main(
process.argv.slice(2),
{
// eslint-disable-next-line global-require
http: require('http'),
},
{ setInterval, clearInterval },
);
}
84 changes: 33 additions & 51 deletions src/rhopm.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import { nodeFetch } from './curl';
import { RNode } from './rnode';
import { startTerm, listenAtDeployId } from './proxy';
import { proposer } from './proposer';

// @ts-ignore
const { keys, freeze, fromEntries } = Object;

export const rhoDir = 'rho_modules';
export const rhoInfoPath = (src) =>
`${rhoDir}/${src.replace(/\.rho$/, '.json')}`;
/* TODO: handle non-posix paths? */
/** @type {(net: string) => string} */
export const rhoDir = (net) => `rho_modules/${net}`;
/** @type {(src: string, net: string) => string} */
export const rhoInfoPath = (src, net) =>
`${rhoDir(net)}/${src.replace(/\.rho$/, '.json')}`;
export const importPattern = /match\s*\("import",\s*"(?<specifier>[^"]+)",\s*`(?<uri>rho:id:[^`]*)`\)/g;

function log(...args) {
Expand Down Expand Up @@ -215,65 +219,43 @@ function parseEnv(txt) {
}

/**
*
* @param {string} envText
* @param {Record<string, string | undefined>} env
* @param {{ admin?: string, boot: string, read: string }} api
* @param {typeof import('http')} http
* @param {SchedulerAccess} sched
* @param {number} period
* @param {number=} period
*
* @typedef { {
* setInterval: typeof setInterval,
* clearInterval: typeof clearInterval,
* } } SchedulerAccess
* @typedef {import('./proposer').SchedulerAccess} SchedulerAccess
*/
export function shardIO(envText, http, sched, period = 2 * 1000) {
export function shardAccess(env, api, http, sched, period = 2 * 1000) {
const fetch = nodeFetch({ http });
const env = parseEnv(envText);
const api = {
admin: `http://${env.MY_NET_IP}:40405`,
boot: `http://${env.MY_NET_IP}:40403`,
read: `http://${env.MY_NET_IP}:40413`,
};
const rnode = RNode(fetch);

const proposer = rnode.admin(api.admin);
let proposing = false;
let waiters = 0;
let pid;
const rnode = RNode(fetch);
const blockMaker = proposer(api, rnode, sched, period);

return freeze({
env,
...api,
validator: rnode.validator(api.boot),
observer: rnode.observer(api.read),
startProposing() {
waiters += 1;
if (typeof pid !== 'undefined') {
return;
}
pid = sched.setInterval(() => {
if (!proposing) {
proposing = true;
proposer
.propose()
.then(() => {
console.log('proposed', { waiters });
proposing = false;
})
.catch((err) => {
console.log('propose failed', { waiters, err: err.message });
proposing = false;
});
}
}, period);
},
stopProposing() {
if (waiters <= 0) {
return;
}
waiters -= 1;
sched.clearInterval(pid);
pid = undefined;
},
...blockMaker,
});
}

/**
* Local shard I/O
*
* @param {string} envText
* @param {typeof import('http')} http
* @param {SchedulerAccess} sched
* @param {number=} period
*/
export function shardIO(envText, http, sched, period = 2 * 1000) {
const env = parseEnv(envText);
const api = {
admin: `http://${env.MY_NET_IP}:40405`,
boot: `http://${env.MY_NET_IP}:40403`,
read: `http://${env.MY_NET_IP}:40413`,
};
return shardAccess(env, api, http, sched, period);
}