Skip to content

Commit 6ee21a0

Browse files
Kuruyiasebtiz13
authored andcommitted
feat: stop using measures from assets and devices in exporter (#369)
1 parent ffa73b9 commit 6ee21a0

File tree

5 files changed

+170
-61
lines changed

5 files changed

+170
-61
lines changed

lib/modules/measure/MeasureExporter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ export class MeasureExporter extends AbstractExporter<MeasureExportParams> {
212212
...measureColumns,
213213
];
214214

215-
const stream = this.getExportStream(result, columns);
215+
const stream = this.getExportStream(result, columns, engineId);
216216
await this.sdk.ms.del(this.exportRedisKey(engineId, exportId));
217217

218218
return stream;

lib/modules/shared/services/AbstractExporter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,19 @@ export interface Column {
3434
isIsoDate?: boolean;
3535
}
3636

37+
export type ExportStreamAugmenter = (
38+
result: SearchResult<KHit<KDocumentContentGeneric>>,
39+
columns: Column[],
40+
engineId: string,
41+
) => Promise<void>;
42+
3743
export abstract class AbstractExporter<P extends ExportParams = ExportParams> {
3844
protected config: ExporterOption = {
3945
expireTime: 2 * 60,
4046
};
4147

48+
protected exportStreamAugmenters: ExportStreamAugmenter[] = [];
49+
4250
constructor(
4351
protected plugin: DeviceManagerPlugin,
4452
protected target: InternalCollection,
@@ -142,13 +150,18 @@ export abstract class AbstractExporter<P extends ExportParams = ExportParams> {
142150
async getExportStream(
143151
request: SearchResult<KHit<KDocumentContentGeneric>>,
144152
columns: Column[],
153+
engineId: string,
145154
) {
146155
const stream = new PassThrough();
147156

148157
let result = request;
149158
try {
150159
stream.write(stringify([columns.map((column) => column.header)]));
151160
while (result) {
161+
for (const augmenter of this.exportStreamAugmenters) {
162+
await augmenter(result, columns, engineId);
163+
}
164+
152165
for (const hit of result.hits) {
153166
stream.write(stringify([this.formatHit(columns, hit)]));
154167
}

lib/modules/shared/services/DigitalTwinExporter.ts

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
1-
import { JSONObject } from "kuzzle";
1+
import { JSONObject, KHit, SearchResult } from "kuzzle";
22
import { ask } from "kuzzle-plugin-commons";
33
import { UUID } from "node:crypto";
44

5-
import { DigitalTwinContent, flattenObject } from "../";
5+
import {
6+
AskDigitalTwinLastMeasuresGet,
7+
DigitalTwinContent,
8+
DigitalTwinMeasures,
9+
EmbeddedMeasure,
10+
flattenObject,
11+
} from "../";
612
import { NamedMeasures } from "../../decoder";
13+
import { MeasureContent, MeasureOriginDevice } from "../../measure";
714
import {
815
AskModelMeasureGet,
916
AssetModelContent,
1017
DeviceModelContent,
1118
} from "../../model";
12-
import { InternalCollection } from "../../plugin";
13-
import { AbstractExporter, Column } from "./AbstractExporter";
19+
import { DeviceManagerPlugin, InternalCollection } from "../../plugin";
20+
import { AbstractExporter, Column, ExporterOption } from "./AbstractExporter";
1421

1522
interface MeasureColumn extends Column {
1623
isMeasure: boolean;
1724
}
1825

26+
interface DigitalTwinExtraData {
27+
measures: DigitalTwinMeasures;
28+
lastMeasuredAt: number;
29+
}
30+
1931
export class DigitalTwinExporter extends AbstractExporter {
32+
constructor(
33+
protected plugin: DeviceManagerPlugin,
34+
protected target: InternalCollection,
35+
config: Partial<ExporterOption> = {},
36+
) {
37+
super(plugin, target, config);
38+
39+
this.exportStreamAugmenters.push(this.addMeasuresToExportStream.bind(this));
40+
}
41+
2042
protected exportRedisKey(engineId: string, exportId: string) {
2143
return `exports:${engineId}:${this.target}:${exportId}`;
2244
}
@@ -54,7 +76,7 @@ export class DigitalTwinExporter extends AbstractExporter {
5476
},
5577
];
5678

57-
const stream = this.getExportStream(digitalTwins, columns);
79+
const stream = this.getExportStream(digitalTwins, columns, engineId);
5880

5981
await this.sdk.ms.del(this.exportRedisKey(engineId, exportId));
6082

@@ -144,4 +166,57 @@ export class DigitalTwinExporter extends AbstractExporter {
144166

145167
return columns;
146168
}
169+
170+
private async addMeasuresToExportStream(
171+
result: SearchResult<KHit<DigitalTwinContent & DigitalTwinExtraData>>,
172+
_: Column[],
173+
engineId: string,
174+
) {
175+
const type = this.target === InternalCollection.ASSETS ? "asset" : "device";
176+
177+
for (const hit of result.hits) {
178+
let lastMeasures: MeasureContent[];
179+
180+
try {
181+
lastMeasures = await ask<AskDigitalTwinLastMeasuresGet>(
182+
`ask:device-manager:${type}:get-last-measures`,
183+
{
184+
digitalTwinId: hit._id,
185+
engineId,
186+
},
187+
);
188+
} catch (e) {
189+
continue;
190+
}
191+
192+
hit._source.measures = lastMeasures.reduce((accumulator, measure) => {
193+
const measureName =
194+
type === "asset"
195+
? measure.asset.measureName
196+
: (measure.origin as MeasureOriginDevice).measureName;
197+
198+
const embeddedMeasure: EmbeddedMeasure = {
199+
measuredAt: measure.measuredAt,
200+
name: measureName,
201+
originId: measure.origin._id,
202+
payloadUuids: measure.origin.payloadUuids,
203+
type: measure.type,
204+
values: measure.values,
205+
};
206+
207+
return {
208+
...accumulator,
209+
[measureName]: embeddedMeasure,
210+
};
211+
}, {});
212+
213+
const lastMeasuredAt = Math.max(
214+
...lastMeasures.map((measure) => measure.measuredAt),
215+
);
216+
217+
if (Number.isFinite(lastMeasuredAt)) {
218+
hit._source.lastMeasuredAt = lastMeasuredAt;
219+
}
220+
}
221+
}
147222
}

tests/scenario/modules/assets/action-export.test.ts

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import fixtures from "../../../fixtures/fixtures";
1010
const assetCount = fixtures["engine-ayse"].assets.length / 2;
1111
jest.setTimeout(10000);
1212

13-
function getExportedColums(row) {
13+
function getExportedColums(row: string) {
1414
const parsedRow = csvParse(row)[0];
1515

1616
return {
@@ -20,39 +20,43 @@ function getExportedColums(row) {
2020
co2: parsedRow[3],
2121
humidity: parsedRow[4],
2222
illuminance: parsedRow[5],
23-
position: parsedRow[6],
24-
positionAccuracy: parsedRow[7],
25-
positionAltitude: parsedRow[8],
26-
powerConsumptionWatt: parsedRow[9],
27-
temperature: parsedRow[10],
28-
temperatureExt: parsedRow[11],
29-
temperatureInt: parsedRow[12],
30-
temperatureWeather: parsedRow[13],
31-
lastMeasuredAt: parsedRow[14],
32-
lastMeasuredAtISO: parsedRow[15],
23+
magiculeExt: parsedRow[6],
24+
magiculeInt: parsedRow[7],
25+
position: parsedRow[8],
26+
positionAccuracy: parsedRow[9],
27+
positionAltitude: parsedRow[10],
28+
powerConsumptionWatt: parsedRow[11],
29+
temperature: parsedRow[12],
30+
temperatureExt: parsedRow[13],
31+
temperatureInt: parsedRow[14],
32+
temperatureWeather: parsedRow[15],
33+
lastMeasuredAt: parsedRow[16],
34+
lastMeasuredAtISO: parsedRow[17],
3335
};
3436
}
3537

3638
describe("AssetsController:exportMeasures", () => {
3739
const sdk = setupHooks();
3840

3941
it("should prepare export of different assets types and return a CSV as stream", async () => {
42+
const measureDate = Date.now();
43+
4044
await sendDummyTempPositionPayloads(sdk, [
4145
{
4246
deviceEUI: "warehouse",
4347
temperature: 23.3,
4448
location: { lat: 42.2, lon: 2.42, accuracy: 2100 },
4549
battery: 0.8,
4650
// ? Use date now - 1s to ensure this asset are second in export
47-
measuredAt: Date.now() - 2000,
51+
measuredAt: measureDate - 2000,
4852
},
4953
{
5054
deviceEUI: "linked2",
5155
temperature: 23.3,
5256
location: { lat: 42.2, lon: 2.42, accuracy: 2100 },
5357
battery: 0.8,
5458
// ? Use date now to ensure this asset is first in export
55-
measuredAt: Date.now(),
59+
measuredAt: measureDate,
5660
},
5761
]);
5862
await sdk.collection.refresh("engine-ayse", "assets");
@@ -64,9 +68,6 @@ describe("AssetsController:exportMeasures", () => {
6468
controller: "device-manager/assets",
6569
action: "export",
6670
engineId: "engine-ayse",
67-
body: {
68-
sort: { lastMeasuredAt: "desc" },
69-
},
7071
});
7172

7273
expect(typeof result.link).toBe("string");
@@ -85,48 +86,58 @@ describe("AssetsController:exportMeasures", () => {
8586

8687
writeFileSync("./assets.csv", csv.join(""));
8788

88-
expect(csv[0]).toBe(
89+
expect(csv).toHaveLength(assetCount + 1);
90+
91+
const header = csv.shift();
92+
93+
expect(header).toBe(
8994
"Model,Reference,brightness.lumens,co2,humidity,illuminance,magiculeExt,magiculeInt,position,position.accuracy,position.altitude,powerConsumption.watt,temperature,temperatureExt,temperatureInt,temperatureWeather,lastMeasuredAt,lastMeasuredAtISO\n",
9095
);
9196

92-
expect(csv).toHaveLength(assetCount + 1);
97+
const rows = csv
98+
.map(getExportedColums)
99+
.sort((a, b) => b.lastMeasuredAt - a.lastMeasuredAt);
93100

94-
const row1 = getExportedColums(csv[1]);
101+
const row1 = rows[0];
95102

96103
expect(row1.model).toBe("Container");
97-
expect(typeof row1.reference).toBe("string");
104+
expect(row1.reference).toBe("linked2");
105+
expect(row1.position).toBe('{"lat":42.2,"lon":2.42}');
106+
expect(parseFloat(row1.positionAccuracy)).toBe(2100);
107+
expect(parseFloat(row1.temperatureExt)).toBe(23.3);
108+
expect(parseFloat(row1.lastMeasuredAt)).toBe(measureDate);
109+
expect(row1.lastMeasuredAtISO).toBe(new Date(measureDate).toISOString());
110+
98111
expect(typeof parseFloat(row1.brightnessLumens)).toBe("number");
99112
expect(typeof parseFloat(row1.co2)).toBe("number");
100113
expect(typeof parseFloat(row1.humidity)).toBe("number");
101114
expect(typeof parseFloat(row1.illuminance)).toBe("number");
102-
expect(typeof row1.position).toBe("string");
103-
expect(typeof parseFloat(row1.positionAccuracy)).toBe("number");
104115
expect(typeof parseFloat(row1.positionAltitude)).toBe("number");
105116
expect(typeof parseFloat(row1.powerConsumptionWatt)).toBe("number");
106117
expect(typeof parseFloat(row1.temperature)).toBe("number");
107-
expect(typeof parseFloat(row1.temperatureExt)).toBe("number");
108118
expect(typeof parseFloat(row1.temperatureInt)).toBe("number");
109119
expect(typeof parseFloat(row1.temperatureWeather)).toBe("number");
110-
expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number");
111-
expect(typeof row1.lastMeasuredAtISO).toBe("string");
112120

113-
const row2 = getExportedColums(csv[2]);
121+
const row2 = rows[1];
114122

115123
expect(row2.model).toBe("Warehouse");
116-
expect(typeof row2.reference).toBe("string");
124+
expect(row2.reference).toBe("linked");
125+
expect(row2.position).toBe('{"lat":42.2,"lon":2.42}');
126+
expect(parseFloat(row2.positionAccuracy)).toBe(2100);
127+
expect(parseFloat(row2.lastMeasuredAt)).toBe(measureDate - 2000);
128+
expect(row2.lastMeasuredAtISO).toBe(
129+
new Date(measureDate - 2000).toISOString(),
130+
);
131+
117132
expect(typeof parseFloat(row2.brightnessLumens)).toBe("number");
118133
expect(typeof parseFloat(row2.co2)).toBe("number");
119134
expect(typeof parseFloat(row2.humidity)).toBe("number");
120135
expect(typeof parseFloat(row2.illuminance)).toBe("number");
121-
expect(typeof row2.position).toBe("string");
122-
expect(typeof parseFloat(row2.positionAccuracy)).toBe("number");
123136
expect(typeof parseFloat(row2.positionAltitude)).toBe("number");
124137
expect(typeof parseFloat(row2.powerConsumptionWatt)).toBe("number");
125138
expect(typeof parseFloat(row2.temperature)).toBe("number");
126139
expect(typeof parseFloat(row2.temperatureExt)).toBe("number");
127140
expect(typeof parseFloat(row2.temperatureInt)).toBe("number");
128141
expect(typeof parseFloat(row2.temperatureWeather)).toBe("number");
129-
expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number");
130-
expect(typeof row2.lastMeasuredAtISO).toBe("string");
131142
});
132143
});

0 commit comments

Comments
 (0)