Skip to content

Commit 40a93e9

Browse files
committed
chore: stream large lists diff
1 parent c6a186e commit 40a93e9

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed

src/models/stream.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { LIST_STATUS } from "./list";
2+
3+
export type StreamListsDiff<T extends Record<string, unknown>> = {
4+
currentValue: T | null;
5+
previousValue: T | null;
6+
prevIndex: number | null;
7+
newIndex: number | null;
8+
indexDiff: number | null;
9+
status: LIST_STATUS;
10+
};
11+
12+
export type ReferenceProperty<T extends Record<string, unknown>> = keyof T;
13+
14+
export type StreamReferences<T extends Record<string, unknown>> = Map<
15+
ReferenceProperty<T>,
16+
{ prevIndex: number; nextIndex?: number }
17+
>;
18+
19+
export type ListStreamOptions = {
20+
chunksSize?: number; // 0 by default. If 0, stream will be live
21+
showOnly?: `${LIST_STATUS}`[];
22+
considerMoveAsUpdate?: boolean;
23+
};
24+
25+
export const DEFAULT_LIST_STREAM_OPTIONS: ListStreamOptions = {
26+
chunksSize: 0,
27+
};

src/stream/emitter.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
type Listener<T extends unknown[]> = (...args: T) => void;
2+
3+
export enum StreamEvent {
4+
Data = "data",
5+
Finish = "finish",
6+
Error = "error",
7+
}
8+
export class EventEmitter {
9+
private events: Record<string, Listener<unknown[]>[]> = {};
10+
11+
on<T extends unknown[]>(
12+
event: `${StreamEvent}`,
13+
listener: Listener<T>,
14+
): this {
15+
if (!this.events[event]) {
16+
this.events[event] = [];
17+
}
18+
this.events[event].push(listener as Listener<unknown[]>);
19+
return this;
20+
}
21+
22+
emit<T extends unknown[]>(event: `${StreamEvent}`, ...args: T): void {
23+
if (this.events[event]) {
24+
this.events[event].forEach((listener) => listener(...args));
25+
}
26+
}
27+
}

src/stream/stream-list-diff.ts

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import {
2+
DEFAULT_LIST_STREAM_OPTIONS,
3+
ListStreamOptions,
4+
ReferenceProperty,
5+
StreamListsDiff,
6+
StreamReferences,
7+
} from "../models/stream";
8+
import { LIST_STATUS } from "../models/list";
9+
import { isEqual } from "../utils";
10+
import { EventEmitter, StreamEvent } from "./emitter";
11+
12+
function outputDiffChunk<T extends Record<string, unknown>>(
13+
emitter: EventEmitter,
14+
) {
15+
let chunks: StreamListsDiff<T>[] = [];
16+
17+
return function handleDiffChunk(
18+
chunk: StreamListsDiff<T>,
19+
options: ListStreamOptions,
20+
): void {
21+
const showChunk = options?.showOnly
22+
? options?.showOnly.includes(chunk.status)
23+
: true;
24+
if (!showChunk) {
25+
return;
26+
}
27+
if ((options.chunksSize as number) > 0) {
28+
chunks.push(chunk);
29+
if (chunks.length >= (options.chunksSize as number)) {
30+
const output = chunks;
31+
chunks = [];
32+
return emitter.emit(StreamEvent.Data, output);
33+
}
34+
}
35+
return emitter.emit(StreamEvent.Data, [chunk]);
36+
};
37+
}
38+
39+
function formatSingleListStreamDiff<T extends Record<string, unknown>>(
40+
list: T[],
41+
isPrevious: boolean,
42+
status: LIST_STATUS,
43+
options: ListStreamOptions,
44+
): StreamListsDiff<T>[] {
45+
const diff: StreamListsDiff<T>[] = list.map((data, i) => ({
46+
previousValue: isPrevious ? data : null,
47+
currentValue: isPrevious ? null : data,
48+
prevIndex: status === LIST_STATUS.ADDED ? null : i,
49+
newIndex: status === LIST_STATUS.ADDED ? i : null,
50+
indexDiff: null,
51+
status,
52+
}));
53+
if (options.showOnly && options.showOnly.length > 0) {
54+
return diff.filter((value) => options.showOnly?.includes(value.status));
55+
}
56+
return diff;
57+
}
58+
59+
function getDiffChunks<T extends Record<string, unknown>>(
60+
prevList: T[],
61+
nextList: T[],
62+
referenceProperty: ReferenceProperty<T>,
63+
emitter: EventEmitter,
64+
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
65+
) {
66+
if (!prevList && !nextList) {
67+
return [];
68+
}
69+
if (!prevList) {
70+
const nextDiff = formatSingleListStreamDiff(
71+
nextList as T[],
72+
false,
73+
LIST_STATUS.ADDED,
74+
options,
75+
);
76+
return nextDiff.forEach((data) => handleDiffChunk(data, options));
77+
}
78+
if (!nextList) {
79+
const prevDiff = formatSingleListStreamDiff(
80+
prevList as T[],
81+
true,
82+
LIST_STATUS.DELETED,
83+
options,
84+
);
85+
return prevDiff.forEach((data) => handleDiffChunk(data, options));
86+
}
87+
const listsReferences: StreamReferences<T> = new Map();
88+
const handleDiffChunk = outputDiffChunk<T>(emitter);
89+
prevList.forEach((data, i) => {
90+
if (data) {
91+
listsReferences.set(String(data[referenceProperty]), {
92+
prevIndex: i,
93+
nextIndex: undefined,
94+
});
95+
}
96+
});
97+
98+
nextList.forEach((data, i) => {
99+
if (data) {
100+
const listReference = listsReferences.get(
101+
String(data[referenceProperty]),
102+
);
103+
if (listReference) {
104+
listReference.nextIndex = i;
105+
} else {
106+
handleDiffChunk(
107+
{
108+
previousValue: null,
109+
currentValue: data,
110+
prevIndex: null,
111+
newIndex: i,
112+
indexDiff: null,
113+
status: LIST_STATUS.ADDED,
114+
},
115+
options,
116+
);
117+
}
118+
}
119+
});
120+
121+
for (const data of listsReferences.values()) {
122+
if (!data.nextIndex) {
123+
handleDiffChunk(
124+
{
125+
previousValue: prevList[data.prevIndex],
126+
currentValue: null,
127+
prevIndex: data.prevIndex,
128+
newIndex: null,
129+
indexDiff: null,
130+
status: LIST_STATUS.DELETED,
131+
},
132+
options,
133+
);
134+
} else {
135+
const prevData = prevList[data.prevIndex];
136+
const nextData = nextList[data.nextIndex];
137+
const isDataEqual = isEqual(prevData, nextData);
138+
const indexDiff = data.prevIndex - data.nextIndex;
139+
if (isDataEqual) {
140+
if (indexDiff === 0) {
141+
handleDiffChunk(
142+
{
143+
previousValue: prevList[data.prevIndex],
144+
currentValue: nextList[data.nextIndex],
145+
prevIndex: null,
146+
newIndex: data.nextIndex,
147+
indexDiff: null,
148+
status: LIST_STATUS.EQUAL,
149+
},
150+
options,
151+
);
152+
} else {
153+
handleDiffChunk(
154+
{
155+
previousValue: prevList[data.prevIndex],
156+
currentValue: nextList[data.nextIndex],
157+
prevIndex: data.prevIndex,
158+
newIndex: data.nextIndex,
159+
indexDiff,
160+
status: options.considerMoveAsUpdate
161+
? LIST_STATUS.UPDATED
162+
: LIST_STATUS.MOVED,
163+
},
164+
options,
165+
);
166+
}
167+
} else {
168+
handleDiffChunk(
169+
{
170+
previousValue: prevList[data.prevIndex],
171+
currentValue: nextList[data.nextIndex],
172+
prevIndex: data.prevIndex,
173+
newIndex: data.nextIndex,
174+
indexDiff,
175+
status: LIST_STATUS.UPDATED,
176+
},
177+
options,
178+
);
179+
}
180+
}
181+
}
182+
emitter.emit(StreamEvent.Finish);
183+
}
184+
185+
export function streamListsDiff<T extends Record<string, unknown>>(
186+
prevList: T[],
187+
nextList: T[],
188+
referenceProperty: ReferenceProperty<T>,
189+
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
190+
) {
191+
const emitter = new EventEmitter();
192+
try {
193+
setTimeout(
194+
() =>
195+
getDiffChunks(prevList, nextList, referenceProperty, emitter, options),
196+
0,
197+
);
198+
return emitter;
199+
} catch (err) {
200+
emitter.emit(StreamEvent.Error, err);
201+
}
202+
}

0 commit comments

Comments
 (0)