Skip to content

Commit 93c2e5f

Browse files
0.0.1 implementation
0 parents  commit 93c2e5f

File tree

13 files changed

+1028
-0
lines changed

13 files changed

+1028
-0
lines changed

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.DS_Store
2+
/.build
3+
/Packages
4+
xcuserdata/
5+
DerivedData/
6+
.swiftpm/configuration/registries.json
7+
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
8+
.netrc

.swift-format.json

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
{
2+
"fileScopedDeclarationPrivacy" : {
3+
"accessLevel" : "private"
4+
},
5+
"indentConditionalCompilationBlocks" : true,
6+
"indentSwitchCaseLabels" : false,
7+
"indentation" : {
8+
"spaces" : 2
9+
},
10+
"lineBreakAroundMultilineExpressionChainComponents" : false,
11+
"lineBreakBeforeControlFlowKeywords" : false,
12+
"lineBreakBeforeEachArgument" : false,
13+
"lineBreakBeforeEachGenericRequirement" : false,
14+
"lineBreakBetweenDeclarationAttributes" : false,
15+
"lineLength" : 100,
16+
"maximumBlankLines" : 1,
17+
"multiElementCollectionTrailingCommas" : true,
18+
"noAssignmentInExpressions" : {
19+
"allowedFunctions" : [
20+
"XCTAssertNoThrow"
21+
]
22+
},
23+
"prioritizeKeepingFunctionOutputTogether" : false,
24+
"reflowMultilineStringLiterals" : {
25+
"never" : {
26+
27+
}
28+
},
29+
"respectsExistingLineBreaks" : true,
30+
"rules" : {
31+
"AllPublicDeclarationsHaveDocumentation" : false,
32+
"AlwaysUseLiteralForEmptyCollectionInit" : false,
33+
"AlwaysUseLowerCamelCase" : true,
34+
"AmbiguousTrailingClosureOverload" : true,
35+
"AvoidRetroactiveConformances" : true,
36+
"BeginDocumentationCommentWithOneLineSummary" : false,
37+
"DoNotUseSemicolons" : true,
38+
"DontRepeatTypeInStaticProperties" : true,
39+
"FileScopedDeclarationPrivacy" : true,
40+
"FullyIndirectEnum" : true,
41+
"GroupNumericLiterals" : true,
42+
"IdentifiersMustBeASCII" : true,
43+
"NeverForceUnwrap" : false,
44+
"NeverUseForceTry" : false,
45+
"NeverUseImplicitlyUnwrappedOptionals" : false,
46+
"NoAccessLevelOnExtensionDeclaration" : true,
47+
"NoAssignmentInExpressions" : true,
48+
"NoBlockComments" : true,
49+
"NoCasesWithOnlyFallthrough" : true,
50+
"NoEmptyLinesOpeningClosingBraces" : false,
51+
"NoEmptyTrailingClosureParentheses" : true,
52+
"NoLabelsInCasePatterns" : true,
53+
"NoLeadingUnderscores" : false,
54+
"NoParensAroundConditions" : true,
55+
"NoPlaygroundLiterals" : true,
56+
"NoVoidReturnOnFunctionSignature" : true,
57+
"OmitExplicitReturns" : false,
58+
"OneCasePerLine" : true,
59+
"OneVariableDeclarationPerLine" : true,
60+
"OnlyOneTrailingClosureArgument" : true,
61+
"OrderedImports" : true,
62+
"ReplaceForEachWithForLoop" : true,
63+
"ReturnVoidInsteadOfEmptyTuple" : true,
64+
"TypeNamesShouldBeCapitalized" : true,
65+
"UseEarlyExits" : false,
66+
"UseExplicitNilCheckInConditions" : true,
67+
"UseLetInEveryBoundCaseVariable" : true,
68+
"UseShorthandTypeNames" : true,
69+
"UseSingleLinePropertyGetter" : true,
70+
"UseSynthesizedInitializer" : true,
71+
"UseTripleSlashForDocumentationComments" : true,
72+
"UseWhereClausesInForLoops" : false,
73+
"ValidateDocumentationComments" : false
74+
},
75+
"spacesAroundRangeFormationOperators" : false,
76+
"spacesBeforeEndOfLineComments" : 2,
77+
"tabWidth" : 2,
78+
"version" : 1
79+
}

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2025 adam zethraeus
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

Package.resolved

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// swift-tools-version: 6.1
2+
3+
import PackageDescription
4+
5+
let package = Package(
6+
name: "AsyncBroadcaster",
7+
platforms: [.iOS(.v18), .macOS(.v15), .tvOS(.v18), .watchOS(.v11), .visionOS(.v2)],
8+
products: [
9+
.library(
10+
name: "AsyncBroadcaster",
11+
targets: ["AsyncBroadcaster"])
12+
],
13+
dependencies: [
14+
.package(
15+
url: "https://github.com/apple/swift-collections",
16+
from: "1.1.4"
17+
),
18+
.package(
19+
url: "https://github.com/apple/swift-async-algorithms",
20+
from: "1.0.3"
21+
),
22+
],
23+
targets: [
24+
.target(
25+
name: "AsyncBroadcaster",
26+
dependencies: [
27+
.product(name: "DequeModule", package: "swift-collections"),
28+
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
29+
]
30+
),
31+
.testTarget(
32+
name: "AsyncBroadcasterTests",
33+
dependencies: ["AsyncBroadcaster"]
34+
),
35+
]
36+
)

README.md

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# AsyncBroadcaster
2+
3+
Broadcasting/Multicasting for Swift's AsyncSequences
4+
5+
## Features
6+
7+
- **Broadcast**: make an AsyncSequence safe for multiple subscribers.
8+
- **Yield**: synchronously yield values to an AsyncBroadcaster.
9+
- **Replay behavior**: Control how new subscribers receive past values
10+
- `.none`: No replay
11+
- `.latest(n)`: Replay up to n most recent values
12+
- `.unbounded`: Replay all historical values
13+
14+
## Installation
15+
16+
### Swift Package Manager
17+
18+
Add the following to your `Package.swift` file:
19+
20+
```swift
21+
dependencies: [
22+
.package(url: "https://github.com/adam-zethraeus/AsyncBroadcaster.git", from: "0.0.1")
23+
]
24+
```
25+
26+
## Usage
27+
28+
### Basic Example
29+
30+
```swift
31+
32+
let stream = [1,2,3].async.broadcast()
33+
34+
let one = Task {
35+
var result: [Int] = []
36+
for await i in stream {
37+
result.append(i)
38+
}
39+
return result
40+
}
41+
42+
await #expect(one.value == [1,2,3])
43+
```
44+
45+
### Synchronously emitting to a Broadcaster's Continuation
46+
47+
```swift
48+
let channel = AsyncBroadcaster.makeAsyncBroadcaster(of: Int.self, replaying: .latest(3))
49+
50+
let task1 = Task {
51+
var results: [Int] = []
52+
for await value in channel.stream {
53+
results.append(value)
54+
}
55+
return results
56+
}
57+
58+
let task2 = Task {
59+
var results: [Int] = []
60+
for await value in channel.stream {
61+
results.append(value)
62+
}
63+
return results
64+
}
65+
66+
try await Task.sleep(for: .milliseconds(100))
67+
68+
channel.continuation.yield(1)
69+
channel.continuation.yield(2)
70+
channel.continuation.yield(3)
71+
channel.continuation.finish()
72+
let results1 = await task1.value
73+
let results2 = await task2.value
74+
75+
#expect(results1 == [1, 2, 3])
76+
#expect(results2 == [1, 2, 3])
77+
```
78+
79+
## License
80+
81+
MIT
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
public enum AsyncBuffer: Sendable {
2+
case none
3+
case latest(Int)
4+
case unbounded
5+
6+
public func prune<T>(elements: inout [T]) {
7+
switch self {
8+
case .none:
9+
elements.removeAll()
10+
case .latest(let count):
11+
elements = elements.suffix(count)
12+
case .unbounded:
13+
break
14+
}
15+
}
16+
}
17+
18+
extension AsyncStream.Continuation.BufferingPolicy {
19+
init(_ asyncBuffer: AsyncBuffer) {
20+
switch asyncBuffer {
21+
case .none:
22+
self = .bufferingNewest(0)
23+
case .latest(let count):
24+
self = .bufferingNewest(count)
25+
case .unbounded:
26+
self = .unbounded
27+
}
28+
}
29+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
public final class AsyncBroadcaster<Element: Sendable>: AsyncSequence, Sendable {
2+
3+
public typealias Element = Element
4+
5+
public init<S: AsyncSequence>(replay: AsyncBuffer, sequence: sending S)
6+
where S.Element == Element {
7+
let controller = MulticastController<Element>(
8+
sequence.map(MulticastController.Event.publish), replay: replay)
9+
self.controller = controller
10+
self.memory = replay
11+
}
12+
13+
let controller: MulticastController<Element>
14+
let memory: AsyncBuffer
15+
16+
public func makeAsyncIterator() -> Iterator {
17+
let underlying = AsyncStream<Element>
18+
.makeStream(
19+
of: Element.self,
20+
bufferingPolicy: .unbounded
21+
)
22+
controller.handle(.subscribe(underlying.continuation))
23+
return Iterator(underlying: underlying.stream.makeAsyncIterator())
24+
}
25+
26+
public struct Iterator: AsyncIteratorProtocol {
27+
init(underlying: AsyncStream<Element>.Iterator) {
28+
self.underlying = underlying
29+
}
30+
private var underlying: AsyncStream<Element>.Iterator
31+
public mutating func next() async -> Element? {
32+
await underlying.next()
33+
}
34+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *)
35+
public mutating func next(isolation: isolated (any Actor)?) async throws(Never)
36+
-> Element?
37+
{
38+
await underlying.next(isolation: isolation)
39+
}
40+
}
41+
}
42+
43+
extension AsyncSequence where Self: Sendable, Self.Element: Sendable {
44+
public func broadcast(replay: AsyncBuffer = .none) -> AsyncBroadcaster<Element> {
45+
AsyncBroadcaster(replay: replay, sequence: self)
46+
}
47+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
extension AsyncBroadcaster {
2+
3+
public struct Continuation: Sendable {
4+
let continuation: AsyncStream<Element>.Continuation
5+
6+
public func yield(_ element: sending Element) {
7+
continuation.yield(element)
8+
}
9+
10+
public consuming func finish() {
11+
continuation.finish()
12+
}
13+
}
14+
15+
public static func makeAsyncBroadcaster(
16+
of: Element.Type = Element.self, replaying: AsyncBuffer = .none
17+
) -> Subject {
18+
let upstream = AsyncStream.makeStream(of: Element.self)
19+
return Subject(
20+
stream: upstream.stream.broadcast(replay: replaying),
21+
continuation: .init(continuation: upstream.continuation))
22+
}
23+
24+
public struct Subject: Sendable {
25+
public let stream: AsyncBroadcaster<Element>
26+
public let continuation: Continuation
27+
}
28+
}

0 commit comments

Comments
 (0)