Skip to content

Commit 5407315

Browse files
rochdevtlhunter
authored andcommitted
add ffi crate
1 parent ed37186 commit 5407315

File tree

9 files changed

+137
-30
lines changed

9 files changed

+137
-30
lines changed

benchmark/sirun/plugin-koa/internal-tracer/encoder.js

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ const Chunk = require('../../../../packages/dd-trace/src/encode/chunk')
44
const { storage } = require('../../../../packages/datadog-core')
55
const { Client } = require('./client')
66
const { zeroId } = require('./id')
7+
const { now } = require('./now')
78

8-
const processStartTime = BigInt(Date.now() * 1e6)
9-
const processStartTicks = process.hrtime.bigint()
10-
const now = () => Number(processStartTime + process.hrtime.bigint() - processStartTicks)
119
// const service = process.env.DD_SERVICE || 'unnamed-node-app'
1210
const ARRAY_OF_TWO = 0x92
1311
const SOFT_LIMIT = 8 * 1024 * 1024 // 8MB
@@ -171,10 +169,31 @@ class Encoder {
171169
if (count === 0) return
172170

173171
const data = this.makePayload()
174-
const path = `/v0.1/events`
175172

176173
this._timer = clearTimeout(this._timer)
177-
this._client.request({ data, path, count }, done)
174+
175+
if (process.env.WITH_NATIVE_COLLECTOR) {
176+
this.flushFfi(data, done)
177+
} else {
178+
const path = `/v0.1/events`
179+
this._client.request({ data, path, count }, done)
180+
}
181+
}
182+
183+
// TODO: Use node:ffi when it lands.
184+
// https://github.com/nodejs/node/pull/46905
185+
flushFfi (data, done) {
186+
const path = require('path')
187+
const { getNativeFunction, getBufferPointer } = require('sbffi')
188+
const libPath = path.normalize(
189+
path.join(__dirname, '../../../../collector/target/release/libffi.dylib')
190+
)
191+
const submit = getNativeFunction(libPath, 'submit', 'uint32_t', ['uint32_t', 'uint8_t *'])
192+
const ptr = getBufferPointer(data)
193+
194+
submit(data.length, ptr)
195+
196+
done()
178197
}
179198

180199
reset () {
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
'use strict'
2+
3+
const processStartTime = BigInt(Date.now() * 1e6)
4+
const processStartTicks = process.hrtime.bigint()
5+
6+
function now () {
7+
Number(processStartTime + process.hrtime.bigint() - processStartTicks)
8+
}
9+
10+
module.exports = { now }

collector/Cargo.lock

Lines changed: 18 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"common",
4+
"ffi",
45
"hyper-client",
56
"server"
67
]

collector/ffi/Cargo.toml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "ffi"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[lib]
7+
crate_type = ["cdylib"]
8+
9+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
10+
11+
[dependencies]
12+
common = { path = "../common" }
13+
hyper-client = { path = "../hyper-client" }
14+
hyper = "0.14.24"
15+
tokio = "1.25.0"
16+
libc = "0.2.139"

collector/ffi/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use common::exporting::agent::AgentExporter;
2+
use common::processing::Processor;
3+
use hyper::body::Buf;
4+
use hyper_client::HyperClient;
5+
use tokio::sync::mpsc::{self, Receiver, Sender};
6+
7+
extern crate libc;
8+
9+
#[no_mangle]
10+
pub extern "C" fn submit(size: usize, ptr: *const u8) -> u32 {
11+
internal_submit(unsafe {
12+
std::slice::from_raw_parts(ptr as *const u8, size as usize)
13+
}) as u32
14+
}
15+
16+
#[tokio::main]
17+
async fn internal_submit(payload: &[u8]) -> u32 {
18+
let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1);
19+
20+
let mut client = Box::new(HyperClient::new());
21+
22+
client.on_response(tx);
23+
24+
let exporter = Box::new(AgentExporter::new(client));
25+
let mut processor = Processor::new(exporter);
26+
let mut rd = payload.reader();
27+
28+
processor.process(&mut rd);
29+
processor.flush();
30+
31+
rx.recv().await.unwrap();
32+
33+
0 // TODO: Return proper response buffer instead.
34+
}

collector/hyper-client/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
use common::client::Client;
2+
use tokio::sync::mpsc::Sender;
23

3-
pub struct HyperClient {}
4+
pub struct HyperClient {
5+
tx: Option<Sender<()>>
6+
}
47

58
impl HyperClient {
69
pub fn new() -> Self {
7-
Self {}
10+
Self {
11+
tx: None
12+
}
13+
}
14+
15+
// TODO: Require a sender in `new()` instead.
16+
pub fn on_response (&mut self, tx: Sender<()>) {
17+
self.tx = Some(tx);
818
}
919
}
1020

@@ -29,8 +39,17 @@ impl Client for HyperClient {
2939
.body(hyper::Body::from(data))
3040
.unwrap();
3141

42+
let tx = self.tx.clone();
43+
3244
tokio::spawn(async move {
33-
hyper::Client::new().request(req).await.unwrap();
45+
let res = hyper::Client::new().request(req).await.unwrap();
46+
47+
// Discard the response for now.
48+
hyper::body::to_bytes(res.into_body()).await.unwrap();
49+
50+
if let Some(tx) = tx {
51+
tx.send(()).await.unwrap();
52+
}
3453
});
3554
}
3655
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
"path-to-regexp": "^0.1.2",
8787
"protobufjs": "^7.1.2",
8888
"retry": "^0.10.1",
89+
"sbffi": "bengl/sbffi",
8990
"semver": "^5.5.0"
9091
},
9192
"devDependencies": {

yarn.lock

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2646,6 +2646,11 @@ node-abort-controller@^3.0.1:
26462646
resolved "https://registry.yarnpkg.com/node-abort-controller/-/node-abort-controller-3.0.1.tgz#f91fa50b1dee3f909afabb7e261b1e1d6b0cb74e"
26472647
integrity sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw==
26482648

2649+
node-addon-api@^6.0.0:
2650+
version "6.0.0"
2651+
resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-6.0.0.tgz#cfb3574e6df708ff71a30db6c4762d9e06e11c27"
2652+
integrity sha512-GyHvgPvUXBvAkXa0YvYnhilSB1A+FRYMpIVggKzPZqdaZfevZOuzfWzyvgzOwRLHBeo/MMswmJFsrNF4Nw1pmA==
2653+
26492654
node-gyp-build@^3.9.0:
26502655
version "3.9.0"
26512656
resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-3.9.0.tgz#53a350187dd4d5276750da21605d1cb681d09e25"
@@ -3190,6 +3195,12 @@ safe-regex-test@^1.0.0:
31903195
resolved "https://registry.yarnpkg.com/safer-buffer/-/safer-buffer-2.1.2.tgz#44fa161b0187b9549dd84bb91802f9bd8385cd6a"
31913196
integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==
31923197

3198+
sbffi@bengl/sbffi:
3199+
version "1.0.4"
3200+
resolved "https://codeload.github.com/bengl/sbffi/tar.gz/35a2584cec76060b7a883f0ece5c956e92d01144"
3201+
dependencies:
3202+
node-addon-api "^6.0.0"
3203+
31933204
semver@5.3.0:
31943205
version "5.3.0"
31953206
resolved "https://registry.yarnpkg.com/semver/-/semver-5.3.0.tgz#9b2ce5d3de02d17c6012ad326aa6b4d0cf54f94f"

0 commit comments

Comments
 (0)