Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,8 @@ In general you don't need to use this method as it has no advantages over using

```swift
let key = RedisKey(rawValue: "MyKey")
let responses = try await connection.pipeline([
.set(key, "TestString"),
.get(key)
])
let (setResponse, getResponse) = try await connection.pipeline(
SET(key, "TestString"),
GET(key)
)
```

The `RedisConnection.pipeline` command returns an array of `RESPTokens`, one for each command. So we can get the result of the `get` in the above example by converting the second token into a `String`.

```swift
let value = responses[1].converting(to: String.self)
```
58 changes: 35 additions & 23 deletions Sources/Redis/Connection/RedisConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public struct ServerAddress: Sendable, Equatable {
@_documentation(visibility: internal)
public struct RedisConnection: Sendable {
enum Request {
case command(RESPCommand)
case pipelinedCommands([RESPCommand])
case command(ByteBuffer)
case pipelinedCommands(ByteBuffer, Int)
}
enum Response {
case token(RESPToken)
Expand Down Expand Up @@ -96,7 +96,7 @@ public struct RedisConnection: Sendable {
do {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows quite nicely that working with subscriptions at the same time can get weird... we don't want to wait for a write in order to read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in this situation a subscription will change the state of the pipeline. And turn into a loop returning the published content. The difficulty is in RESP3 where you can continue to send commands.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow you here. Sure we can add another case for subscriptions. Problem is we might not consume those if we are stuck in writing.

switch request {
case .command(let command):
try await outbound.write(command.buffer)
try await outbound.write(command)
let response = try await inboundIterator.next()
if let response {
continuation.resume(returning: .token(response))
Expand All @@ -109,10 +109,10 @@ public struct RedisConnection: Sendable {
)
)
}
case .pipelinedCommands(let commands):
try await outbound.write(contentsOf: commands.map { $0.buffer })
case .pipelinedCommands(let commands, let count):
try await outbound.write(commands)
var responses: [RESPToken] = .init()
for _ in 0..<commands.count {
for _ in 0..<count {
let response = try await inboundIterator.next()
if let response {
responses.append(response)
Expand Down Expand Up @@ -153,14 +153,11 @@ public struct RedisConnection: Sendable {
}
}

@discardableResult public func send(command: RESPCommand) async throws -> RESPToken {
if logger.logLevel <= .debug {
var buffer = command.buffer
let sending = try [String](from: RESPToken(consuming: &buffer)!).joined(separator: " ")
self.logger.debug("send: \(sending)")
}
@discardableResult public func send<Command: RedisCommand>(command: Command) async throws -> Command.Response {
var encoder = RedisCommandEncoder()
command.encode(into: &encoder)
let response: Response = try await withCheckedThrowingContinuation { continuation in
switch requestContinuation.yield((.command(command), continuation)) {
switch requestContinuation.yield((.command(encoder.buffer), continuation)) {
case .enqueued:
break
case .dropped, .terminated:
Expand All @@ -175,12 +172,21 @@ public struct RedisConnection: Sendable {
}
}
guard case .token(let token) = response else { preconditionFailure("Expected a single response") }
return token
return try .init(from: token)
}

@discardableResult public func pipeline(_ commands: [RESPCommand]) async throws -> [RESPToken] {
@discardableResult public func pipeline<each Command: RedisCommand>(
_ commands: repeat each Command
) async throws -> (repeat (each Command).Response) {
var count = 0
var encoder = RedisCommandEncoder()
for command in repeat each commands {
command.encode(into: &encoder)
count += 1
}

let response: Response = try await withCheckedThrowingContinuation { continuation in
switch requestContinuation.yield((.pipelinedCommands(commands), continuation)) {
switch requestContinuation.yield((.pipelinedCommands(encoder.buffer, count), continuation)) {
case .enqueued:
break
case .dropped, .terminated:
Expand All @@ -195,21 +201,19 @@ public struct RedisConnection: Sendable {
}
}
guard case .pipelinedResponse(let tokens) = response else { preconditionFailure("Expected a single response") }
return tokens
}

@discardableResult public func send<each Arg: RESPRenderable>(_ command: repeat each Arg) async throws -> RESPToken {
let command = RESPCommand(repeat each command)
return try await self.send(command: command)
var index = AutoIncrementingInteger()
return try (repeat (each Command).Response(from: tokens[index.next()]))
}

/// Try to upgrade to RESP3
private func resp3Upgrade(
outbound: NIOAsyncChannelOutboundWriter<ByteBuffer>,
inboundIterator: inout NIOAsyncChannelInboundStream<RESPToken>.AsyncIterator
) async throws {
let helloCommand = RESPCommand("HELLO", "3")
try await outbound.write(helloCommand.buffer)
var encoder = RedisCommandEncoder()
encoder.encodeArray("HELLO", 3)
try await outbound.write(encoder.buffer)
let response = try await inboundIterator.next()
guard let response else {
throw RedisClientError(.connectionClosed, message: "The connection to the Redis database was unexpectedly closed.")
Expand Down Expand Up @@ -317,3 +321,11 @@ extension ClientBootstrap: ClientBootstrapProtocol {}
#if canImport(Network)
extension NIOTSConnectionBootstrap: ClientBootstrapProtocol {}
#endif

private struct AutoIncrementingInteger {
var value: Int = 0
mutating func next() -> Int {
value += 1
return value - 1
}
}
1 change: 1 addition & 0 deletions Sources/Redis/RESP/RESPError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public struct RESPParsingError: Error {
public var code: Code
public var buffer: ByteBuffer

@usableFromInline
package init(code: Code, buffer: ByteBuffer) {
self.code = code
self.buffer = buffer
Expand Down
36 changes: 11 additions & 25 deletions Sources/Redis/RESP/RESPRenderable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ import NIOCore

/// Type that can be rendered into a RESP buffer
public protocol RESPRenderable {
func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int
func encode(into commandEncoder: inout RedisCommandEncoder) -> Int
}

extension Optional: RESPRenderable where Wrapped: RESPRenderable {
@inlinable
public func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int {
public func encode(into commandEncoder: inout RedisCommandEncoder) -> Int {
switch self {
case .some(let wrapped):
return wrapped.writeToRESPBuffer(&buffer)
return wrapped.encode(into: &commandEncoder)
case .none:
return 0
}
Expand All @@ -33,49 +33,35 @@ extension Optional: RESPRenderable where Wrapped: RESPRenderable {

extension Array: RESPRenderable where Element: RESPRenderable {
@inlinable
public func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int {
public func encode(into commandEncoder: inout RedisCommandEncoder) -> Int {
var count = 0
for element in self {
count += element.writeToRESPBuffer(&buffer)
count += element.encode(into: &commandEncoder)
}
return count
}
}

extension String: RESPRenderable {
@inlinable
public func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int {
buffer.writeBulkString(self)
public func encode(into commandEncoder: inout RedisCommandEncoder) -> Int {
commandEncoder.encodeBulkString(self)
return 1
}
}

extension Int: RESPRenderable {
@inlinable
public func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int {
buffer.writeBulkString(String(self))
public func encode(into commandEncoder: inout RedisCommandEncoder) -> Int {
commandEncoder.encodeBulkString(String(self))
return 1
}
}

extension Double: RESPRenderable {
@inlinable
public func writeToRESPBuffer(_ buffer: inout ByteBuffer) -> Int {
buffer.writeBulkString(String(self))
public func encode(into commandEncoder: inout RedisCommandEncoder) -> Int {
commandEncoder.encodeBulkString(String(self))
return 1
}
}

extension ByteBuffer {
public mutating func writeRESP3TypeIdentifier(_ identifier: RESPTypeIdentifier) {
self.writeInteger(identifier.rawValue)
}

public mutating func writeBulkString(_ string: String) {
self.writeRESP3TypeIdentifier(.bulkString)
self.writeString(String(string.utf8.count))
self.writeStaticString("\r\n")
self.writeString(string)
self.writeStaticString("\r\n")
}
}
1 change: 1 addition & 0 deletions Sources/Redis/RESP/RESPToken.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public struct RESPToken: Hashable, Sendable {
case push(Array)
}

@usableFromInline
package let base: ByteBuffer

public var value: Value {
Expand Down
12 changes: 12 additions & 0 deletions Sources/Redis/RESP/RESPTokenRepresentable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ extension RESPToken: RESPTokenRepresentable {
/// - Parameter type: Type to convert to
/// - Throws: RedisClientError.unexpectedType
/// - Returns: Value
@inlinable
public func converting<Value: RESPTokenRepresentable>(to type: Value.Type = Value.self) throws -> Value {
try Value(from: self)
}

@inlinable
public init(from token: RESPToken) throws {
self = token
}
Expand All @@ -38,12 +40,14 @@ extension Array where Element == RESPToken {
/// - Parameter type: Type to convert to
/// - Throws: RedisClientError.unexpectedType
/// - Returns: Array of Value
@inlinable
public func converting<Value: RESPTokenRepresentable>(to type: [Value].Type = [Value].self) throws -> [Value] {
try self.map { try $0.converting() }
}
}

extension ByteBuffer: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .simpleString(let buffer), .bulkString(let buffer), .verbatimString(let buffer), .bigNumber(let buffer):
Expand All @@ -55,13 +59,15 @@ extension ByteBuffer: RESPTokenRepresentable {
}

extension String: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
let buffer = try ByteBuffer(from: token)
self.init(buffer: buffer)
}
}

extension Int: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .number(let value):
Expand All @@ -73,6 +79,7 @@ extension Int: RESPTokenRepresentable {
}

extension Double: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .double(let value):
Expand All @@ -84,6 +91,7 @@ extension Double: RESPTokenRepresentable {
}

extension Bool: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .boolean(let value):
Expand All @@ -95,6 +103,7 @@ extension Bool: RESPTokenRepresentable {
}

extension Optional: RESPTokenRepresentable where Wrapped: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .null:
Expand All @@ -106,6 +115,7 @@ extension Optional: RESPTokenRepresentable where Wrapped: RESPTokenRepresentable
}

extension Array: RESPTokenRepresentable where Element: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .array(let respArray), .push(let respArray):
Expand All @@ -122,6 +132,7 @@ extension Array: RESPTokenRepresentable where Element: RESPTokenRepresentable {
}

extension Set: RESPTokenRepresentable where Element: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .set(let respSet):
Expand All @@ -138,6 +149,7 @@ extension Set: RESPTokenRepresentable where Element: RESPTokenRepresentable {
}

extension Dictionary: RESPTokenRepresentable where Value: RESPTokenRepresentable, Key: RESPTokenRepresentable {
@inlinable
public init(from token: RESPToken) throws {
switch token.value {
case .map(let respMap), .attribute(let respMap):
Expand Down
22 changes: 22 additions & 0 deletions Sources/Redis/RedisCommand.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-redis open source project
//
// Copyright (c) 2025 Apple Inc. and the swift-redis project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-redis project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

/// A redis command that can be executed on a connection.
public protocol RedisCommand {
associatedtype Response: RESPTokenRepresentable = RESPToken

func encode(into commandEncoder: inout RedisCommandEncoder)
}
Loading