Skip to content

Commit cc69eb3

Browse files
authored
refactor(llc): introduce event controller, resolver (#2301)
1 parent 3a2b3e3 commit cc69eb3

File tree

4 files changed

+125
-15
lines changed

4 files changed

+125
-15
lines changed

packages/stream_chat/lib/src/client/client.dart

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import 'package:logging/logging.dart';
55
import 'package:meta/meta.dart';
66
import 'package:rxdart/rxdart.dart';
77
import 'package:stream_chat/src/client/channel.dart';
8+
import 'package:stream_chat/src/client/event_resolvers.dart' as event_resolvers;
89
import 'package:stream_chat/src/client/retry_policy.dart';
910
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
1011
import 'package:stream_chat/src/core/api/requests.dart';
@@ -33,6 +34,7 @@ import 'package:stream_chat/src/core/models/poll_option.dart';
3334
import 'package:stream_chat/src/core/models/poll_vote.dart';
3435
import 'package:stream_chat/src/core/models/thread.dart';
3536
import 'package:stream_chat/src/core/models/user.dart';
37+
import 'package:stream_chat/src/core/util/event_controller.dart';
3638
import 'package:stream_chat/src/core/util/utils.dart';
3739
import 'package:stream_chat/src/db/chat_persistence_client.dart';
3840
import 'package:stream_chat/src/event_type.dart';
@@ -222,21 +224,15 @@ class StreamChatClient {
222224

223225
StreamSubscription<ConnectionStatus>? _connectionStatusSubscription;
224226

225-
final _eventController = PublishSubject<Event>();
226-
227227
/// Stream of [Event] coming from [_ws] connection
228228
/// Listen to this or use the [on] method to filter specific event types
229-
Stream<Event> get eventStream => _eventController.stream.map(
230-
// If the poll vote is an answer, we should emit a different event
231-
// to make it easier to handle in the state.
232-
(event) => switch ((event.type, event.pollVote?.isAnswer == true)) {
233-
(EventType.pollVoteCasted || EventType.pollVoteChanged, true) =>
234-
event.copyWith(type: EventType.pollAnswerCasted),
235-
(EventType.pollVoteRemoved, true) =>
236-
event.copyWith(type: EventType.pollAnswerRemoved),
237-
_ => event,
238-
},
239-
);
229+
Stream<Event> get eventStream => _eventController.stream;
230+
late final _eventController = EventController<Event>(
231+
resolvers: [
232+
event_resolvers.pollAnswerCastedResolver,
233+
event_resolvers.pollAnswerRemovedResolver,
234+
],
235+
);
240236

241237
final _wsConnectionStatusController =
242238
BehaviorSubject.seeded(ConnectionStatus.disconnected);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import 'package:stream_chat/src/core/models/event.dart';
2+
import 'package:stream_chat/src/event_type.dart';
3+
4+
/// Resolves casted or changed poll vote events into more specific
5+
/// `pollAnswerCasted` events for easier downstream state handling.
6+
///
7+
/// Applies when:
8+
/// - `event.type` is `pollVoteCasted` or `pollVoteChanged`, and
9+
/// - `event.pollVote?.isAnswer == true`
10+
///
11+
/// Returns a modified event with type `pollAnswerCasted`,
12+
/// or `null` if not applicable.
13+
Event? pollAnswerCastedResolver(Event event) {
14+
return switch (event.type) {
15+
EventType.pollVoteCasted ||
16+
EventType.pollVoteChanged when event.pollVote?.isAnswer == true =>
17+
event.copyWith(type: EventType.pollAnswerCasted),
18+
_ => null,
19+
};
20+
}
21+
22+
/// Resolves removed poll vote events into more specific
23+
/// `pollAnswerRemoved` events for easier downstream state handling.
24+
///
25+
/// Applies when:
26+
/// - `event.type` is `pollVoteRemoved`, and
27+
/// - `event.pollVote?.isAnswer == true`
28+
///
29+
/// Returns a modified event with type `pollAnswerRemoved`,
30+
/// or `null` if not applicable.
31+
Event? pollAnswerRemovedResolver(Event event) {
32+
return switch (event.type) {
33+
EventType.pollVoteRemoved when event.pollVote?.isAnswer == true =>
34+
event.copyWith(type: EventType.pollAnswerRemoved),
35+
_ => null,
36+
};
37+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import 'dart:async';
2+
import 'package:rxdart/rxdart.dart';
3+
import 'package:stream_chat/src/core/models/event.dart';
4+
5+
/// A function that inspects an event and optionally resolves it into a
6+
/// more specific or refined version of the same type.
7+
///
8+
/// If the resolver does not recognize or handle the event,
9+
/// it returns `null`, allowing other resolvers to attempt resolution.
10+
typedef EventResolver<T extends Event> = T? Function(T event);
11+
12+
/// {@template eventController}
13+
/// A reactive event stream controller for [Event]s that supports conditional
14+
/// resolution before emitting events to subscribers.
15+
///
16+
/// When an event is added:
17+
/// - Each resolver is evaluated in order.
18+
/// - The first resolver that returns a non-null result is used to produce
19+
/// the resolved event that gets emitted.
20+
/// - If no resolver returns a result, the original event is emitted unchanged.
21+
///
22+
/// This is useful for normalizing or refining generic events into more
23+
/// specific ones (e.g. rewriting `pollVoteCasted` into `pollAnswerCasted`)
24+
/// before they reach business logic or state layers.
25+
/// {@endtemplate}
26+
class EventController<T extends Event> extends Subject<T> {
27+
/// {@macro eventController}
28+
factory EventController({
29+
bool sync = false,
30+
void Function()? onListen,
31+
void Function()? onCancel,
32+
List<EventResolver<T>> resolvers = const [],
33+
}) {
34+
// ignore: close_sinks
35+
final controller = StreamController<T>.broadcast(
36+
sync: sync,
37+
onListen: onListen,
38+
onCancel: onCancel,
39+
);
40+
41+
return EventController<T>._(
42+
controller,
43+
controller.stream,
44+
resolvers,
45+
);
46+
}
47+
48+
EventController._(
49+
super.controller,
50+
super.stream,
51+
this._resolvers,
52+
);
53+
54+
/// The list of resolvers used to inspect and optionally resolve events
55+
/// before they are emitted.
56+
///
57+
/// Resolvers are evaluated in order. The first to return a non-null result
58+
/// determines the event that will be emitted. If none apply, the original
59+
/// event is emitted as-is.
60+
final List<EventResolver<T>> _resolvers;
61+
62+
/// Adds an [event] to the stream.
63+
///
64+
/// Each [EventResolver] is applied in order until one returns a non-null
65+
/// result. That resolved event is emitted, and no further resolvers are
66+
/// evaluated. If all resolvers return `null`, the original event is emitted.
67+
@override
68+
void add(T event) {
69+
for (final resolver in _resolvers) {
70+
final result = resolver(event);
71+
if (result != null) return super.add(result);
72+
}
73+
74+
// No resolver matched — emit the event as-is.
75+
return super.add(event);
76+
}
77+
}

packages/stream_chat/test/src/mocks.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import 'package:dio/dio.dart';
22
import 'package:logging/logging.dart';
33
import 'package:mocktail/mocktail.dart';
4-
import 'package:rxdart/rxdart.dart';
54
import 'package:stream_chat/src/client/channel.dart';
65
import 'package:stream_chat/src/client/client.dart';
76
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
@@ -18,6 +17,7 @@ import 'package:stream_chat/src/core/http/stream_http_client.dart';
1817
import 'package:stream_chat/src/core/http/token_manager.dart';
1918
import 'package:stream_chat/src/core/models/channel_config.dart';
2019
import 'package:stream_chat/src/core/models/event.dart';
20+
import 'package:stream_chat/src/core/util/event_controller.dart';
2121
import 'package:stream_chat/src/db/chat_persistence_client.dart';
2222
import 'package:stream_chat/src/event_type.dart';
2323
import 'package:stream_chat/src/ws/websocket.dart';
@@ -98,7 +98,7 @@ class MockStreamChatClient extends Mock implements StreamChatClient {
9898

9999
@override
100100
Stream<Event> get eventStream => _eventController.stream;
101-
final _eventController = PublishSubject<Event>();
101+
final _eventController = EventController<Event>();
102102
void addEvent(Event event) => _eventController.add(event);
103103

104104
@override

0 commit comments

Comments
 (0)