Skip to content

Layer 4 #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
"zig.path": "zig",
"zig.zls.path": "zls",
"zig.zls.enableBuildOnSave": true,
"zig.zls.enableAutofix": true,
"zig.zls.buildOnSaveStep": "test"
"zig.zls.enableAutofix": true
}
222 changes: 176 additions & 46 deletions shared/layer3.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub const Address = enum(u10) {

pub const ChannelError = error{
SendFailed,
// SendTooBig,
RecvFailed,
} || std.mem.Allocator.Error;

Expand All @@ -32,7 +33,9 @@ pub fn ChannelInner(comptime T: type) type {
try self.inner.send(data, to);
}

/// Receive data from an address. Returns `null` if no data is available.
/// Receive data from an address. Returns `null` if no data is available. The returned buffer
/// is not guaranteed to be full remaining contents of the channel, and further data may be
/// available with subsequent calls to `recv`.
pub inline fn recv(self: Self, from: Address) ChannelError!?Owned([]const u8) {
return try self.inner.recv(from);
}
Expand All @@ -43,146 +46,273 @@ pub inline fn Channel(inner: anytype) ChannelInner(@TypeOf(inner)) {
return .{ .inner = inner };
}

/// A mock implementation of the `Channel` interface
pub const MockChannel = struct {
const Self = @This();

allocator: std.mem.Allocator,
buffers: *std.AutoHashMap(Address, std.ArrayList(u8)),
recv_buffer_size: usize,
sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
mutex: *std.Thread.Mutex,

from: MockChannelSimplex,
to: MockChannelSimplex,

pub fn init(allocator: std.mem.Allocator, recv_buffer_size: usize, unreliable: bool) !Self {
const sendBuffer = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8)));
sendBuffer.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator);

pub fn init(allocator: std.mem.Allocator, recv_buffer_size: usize) !Self {
const buffers = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8)));
buffers.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator);
const recvBuffer = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8)));
recvBuffer.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator);

const mutex = try allocator.create(std.Thread.Mutex);
mutex.* = .{};

const to = try MockChannelSimplex.init(allocator, recv_buffer_size, unreliable, sendBuffer, recvBuffer, mutex);
const from = try MockChannelSimplex.init(allocator, recv_buffer_size, unreliable, recvBuffer, sendBuffer, mutex);

return Self{
.recv_buffer_size = recv_buffer_size,
.to = to,
.from = from,
.allocator = allocator,
.buffers = buffers,
.sendBuffer = sendBuffer,
.recvBuffer = recvBuffer,
.mutex = mutex,
};
}

pub fn deinit(self: Self) void {
var iter = self.buffers.valueIterator();
var iter = self.sendBuffer.valueIterator();
while (iter.next()) |buffer| {
buffer.deinit();
}

self.buffers.deinit();
self.allocator.destroy(self.buffers);
self.sendBuffer.deinit();
self.allocator.destroy(self.sendBuffer);

iter = self.recvBuffer.valueIterator();
while (iter.next()) |buffer| {
buffer.deinit();
}

self.recvBuffer.deinit();
self.allocator.destroy(self.recvBuffer);

self.to.deinit();
self.from.deinit();
self.allocator.destroy(self.mutex);
}
};

/// A mock implementation of the `Channel` interface
const MockChannelSimplex = struct {
const Self = @This();

allocator: std.mem.Allocator,
sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
mutex: *std.Thread.Mutex,
recv_buffer_size: usize,

unreliable: bool,
n: *usize,

pub fn init(
allocator: std.mem.Allocator,
recv_buffer_size: usize,
unreliable: bool,
sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)),
mutex: *std.Thread.Mutex,
) !Self {
const n = try allocator.create(usize);
n.* = 0;

return Self{
.recv_buffer_size = recv_buffer_size,
.allocator = allocator,
.sendBuffer = sendBuffer,
.recvBuffer = recvBuffer,
.unreliable = unreliable,
.mutex = mutex,
.n = n,
};
}

pub fn deinit(self: Self) void {
self.allocator.destroy(self.n);
}

pub fn send(self: Self, data: []const u8, to: Address) ChannelError!void {
if (self.buffers.getPtr(to)) |buffer| {
self.mutex.lock();
defer self.mutex.unlock();

// if (data.len > self.recv_buffer_size) {
// return ChannelError.SendTooBig;
// }
if (self.sendBuffer.getPtr(to)) |buffer| {
try buffer.appendSlice(data);
} else {
var list = std.ArrayList(u8).init(self.allocator);
errdefer list.deinit();
try list.appendSlice(data);
try self.buffers.put(to, list);
var buffer = std.ArrayList(u8).init(self.allocator);
errdefer buffer.deinit();
try buffer.appendSlice(data);
try self.sendBuffer.put(to, buffer);
}

self.messWithData();
}

fn messWithData(self: Self) void {
if (self.unreliable) {
var iter = self.sendBuffer.valueIterator();
while (iter.next()) |buffer| {
if (self.n.* % 3 == 0) {
buffer.items[buffer.items.len / 2] /= 2;
}
}

self.n.* += 1;
}
}

pub fn recv(self: Self, from: Address) ChannelError!?Owned([]const u8) {
const buffer = self.buffers.get(from) orelse return null;
self.mutex.lock();
defer self.mutex.unlock();

const buffer = self.recvBuffer.get(from) orelse return null;

const len = @min(self.recv_buffer_size, buffer.items.len);
const recv_buffer = try self.allocator.dupe(u8, buffer.items[0..len]);
errdefer self.allocator.free(recv_buffer);

if (buffer.items.len < self.recv_buffer_size) {
buffer.deinit();
const removed = self.buffers.remove(from);
const removed = self.recvBuffer.remove(from);
std.debug.assert(removed);
} else {
var next_list = std.ArrayList(u8).init(self.allocator);
try next_list.appendSlice(buffer.items[self.recv_buffer_size..]);
buffer.deinit();
try self.buffers.put(from, next_list);
try self.recvBuffer.put(from, next_list);
}

return toOwned(@as([]const u8, recv_buffer), self.allocator);
}
};

test "basic channel" {
const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable;
defer mock.deinit();
const channel = Channel(mock);
const mocks = try MockChannel.init(std.testing.allocator, 80, false);
defer mocks.deinit();

const toChannel = Channel(mocks.to);
const fromChannel = Channel(mocks.from);

const addrA = Address.from(11);

const data = "hello, world!";
try channel.send(data, addrA);
try toChannel.send(data, addrA);

const recv = (try channel.recv(addrA)).?;
const recv = (try fromChannel.recv(addrA)).?;
defer recv.deinit();

try std.testing.expectEqualSlices(u8, data, recv.inner);
}

test "channel large buffer" {
const recv_buffer_size = 80;
const mock = try MockChannel.init(std.testing.allocator, recv_buffer_size);
defer mock.deinit();
const channel = Channel(mock);
const mocks = try MockChannel.init(std.testing.allocator, recv_buffer_size, false);
defer mocks.deinit();

const toChannel = Channel(mocks.to);
const fromChannel = Channel(mocks.from);

const addrA = Address.from(11);

const data = try std.testing.allocator.alloc(u8, recv_buffer_size + 1);
defer std.testing.allocator.free(data);
try channel.send(data, addrA);
try toChannel.send(data, addrA);

const recv = (try channel.recv(addrA)).?;
const recv = (try fromChannel.recv(addrA)).?;
defer recv.deinit();
try std.testing.expectEqualSlices(u8, data[0..recv_buffer_size], recv.inner);

const recv2 = (try channel.recv(addrA)).?;
const recv2 = (try fromChannel.recv(addrA)).?;
defer recv2.deinit();
try std.testing.expectEqualSlices(u8, data[recv_buffer_size .. recv_buffer_size + 1], recv2.inner);
}

test "channel multiple addresses" {
const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable;
defer mock.deinit();
const channel = Channel(mock);
const mocks = try MockChannel.init(std.testing.allocator, 80, false);
defer mocks.deinit();

const toChannel = Channel(mocks.to);
const fromChannel = Channel(mocks.from);

const addrA = Address.from(11);
const addrB = Address.from(12);

const dataA = "data A";
try channel.send(dataA, addrA);
try toChannel.send(dataA, addrA);

const dataB = "data B";
try channel.send(dataB, addrB);
try toChannel.send(dataB, addrB);

const recvB = (try channel.recv(addrB)).?;
const recvB = (try fromChannel.recv(addrB)).?;
defer recvB.deinit();
try std.testing.expectEqualSlices(u8, dataB, recvB.inner);

const recvA = (try channel.recv(addrA)).?;
const recvA = (try fromChannel.recv(addrA)).?;
defer recvA.deinit();
try std.testing.expectEqualSlices(u8, dataA, recvA.inner);
}

test "channel recv nothing" {
const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable;
defer mock.deinit();
const channel = Channel(mock);
const mocks = try MockChannel.init(std.testing.allocator, 80, false);
defer mocks.deinit();

const fromChannel = Channel(mocks.from);

const addr = Address.from(11);

const recv = try channel.recv(addr);
const recv = try fromChannel.recv(addr);
try std.testing.expectEqual(recv, null);
}

test "channel send nothing" {
const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable;
defer mock.deinit();
const channel = Channel(mock);
const mocks = try MockChannel.init(std.testing.allocator, 80, false);
defer mocks.deinit();

const toChannel = Channel(mocks.to);

const addr = Address.from(11);

const data = ([_]u8{})[0..0];
try std.testing.expectEqual(0, data.len);
try channel.send(data, addr);
try toChannel.send(data, addr);
}

test "unreliable channel" {
const recv_buffer_size = 80;
const mocks = try MockChannel.init(std.testing.allocator, recv_buffer_size, true);
defer mocks.deinit();

const toChannel = Channel(mocks.to);
const fromChannel = Channel(mocks.from);

const addrA = Address.from(11);

const data = try std.testing.allocator.alloc(u8, recv_buffer_size * 10);
defer std.testing.allocator.free(data);
try toChannel.send(data, addrA);

var unequal = false;
for (0..10) |_| {
const recv = (try fromChannel.recv(addrA)).?;
defer recv.deinit();
unequal = !std.mem.eql(u8, data[0..recv_buffer_size], recv.inner);
if (unequal) {
break;
}
}

try std.testing.expect(unequal);
}
Loading