From 0097b57704282a5c25185cabdb5dccd4a728e9ed Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Fri, 15 Nov 2024 03:08:03 -0500 Subject: [PATCH 01/15] basic --- shared/layer3.zig | 77 +++++++++++-- shared/layer4.zig | 281 ++++++++++++++++++++++++++++++++++++++++++++++ shared/main.zig | 15 ++- 3 files changed, 360 insertions(+), 13 deletions(-) create mode 100644 shared/layer4.zig diff --git a/shared/layer3.zig b/shared/layer3.zig index 2fd3eed..3b566b3 100644 --- a/shared/layer3.zig +++ b/shared/layer3.zig @@ -17,6 +17,7 @@ pub const Address = enum(u10) { pub const ChannelError = error{ SendFailed, + // SendTooBig, RecvFailed, } || std.mem.Allocator.Error; @@ -32,7 +33,9 @@ pub fn ChannelInner(comptime T: type) type { try self.inner.send(data, to); } - /// Receive data from an address. Returns `null` if no data is available. + /// Receive data from an address. Returns `null` if no data is available. The returned buffer + /// is not guaranteed to be full remaining contents of the channel, and further data may be + /// available with subsequent calls to `recv`. pub inline fn recv(self: Self, from: Address) ChannelError!?Owned([]const u8) { return try self.inner.recv(from); } @@ -51,14 +54,22 @@ pub const MockChannel = struct { buffers: *std.AutoHashMap(Address, std.ArrayList(u8)), recv_buffer_size: usize, - pub fn init(allocator: std.mem.Allocator, recv_buffer_size: usize) !Self { + unreliable: bool, + n: *usize, + + pub fn init(allocator: std.mem.Allocator, recv_buffer_size: usize, unreliable: bool) !Self { const buffers = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8))); buffers.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator); + const n = try allocator.create(usize); + n.* = 0; + return Self{ .recv_buffer_size = recv_buffer_size, .allocator = allocator, .buffers = buffers, + .unreliable = unreliable, + .n = n, }; } @@ -70,16 +81,35 @@ pub const MockChannel = struct { self.buffers.deinit(); self.allocator.destroy(self.buffers); + self.allocator.destroy(self.n); } pub fn send(self: Self, data: []const u8, to: Address) ChannelError!void { + // if (data.len > self.recv_buffer_size) { + // return ChannelError.SendTooBig; + // } if (self.buffers.getPtr(to)) |buffer| { try buffer.appendSlice(data); } else { - var list = std.ArrayList(u8).init(self.allocator); - errdefer list.deinit(); - try list.appendSlice(data); - try self.buffers.put(to, list); + var buffer = std.ArrayList(u8).init(self.allocator); + errdefer buffer.deinit(); + try buffer.appendSlice(data); + try self.buffers.put(to, buffer); + } + + self.messWithData(); + } + + fn messWithData(self: Self) void { + if (self.unreliable) { + var iter = self.buffers.valueIterator(); + while (iter.next()) |buffer| { + if (self.n.* % 2 == 0) { + buffer.items[buffer.items.len / 2] += 7; + } + } + + self.n.* += 1; } } @@ -106,7 +136,7 @@ pub const MockChannel = struct { }; test "basic channel" { - const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable; + const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); const channel = Channel(mock); @@ -122,7 +152,7 @@ test "basic channel" { test "channel large buffer" { const recv_buffer_size = 80; - const mock = try MockChannel.init(std.testing.allocator, recv_buffer_size); + const mock = try MockChannel.init(std.testing.allocator, recv_buffer_size, false); defer mock.deinit(); const channel = Channel(mock); @@ -142,7 +172,7 @@ test "channel large buffer" { } test "channel multiple addresses" { - const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable; + const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); const channel = Channel(mock); @@ -165,7 +195,7 @@ test "channel multiple addresses" { } test "channel recv nothing" { - const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable; + const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); const channel = Channel(mock); @@ -176,7 +206,7 @@ test "channel recv nothing" { } test "channel send nothing" { - const mock = MockChannel.init(std.testing.allocator, 80) catch unreachable; + const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); const channel = Channel(mock); @@ -186,3 +216,28 @@ test "channel send nothing" { try std.testing.expectEqual(0, data.len); try channel.send(data, addr); } + +test "unreliable channel" { + const recv_buffer_size = 80; + const mock = MockChannel.init(std.testing.allocator, recv_buffer_size, true) catch unreachable; + defer mock.deinit(); + const channel = Channel(mock); + + const addrA = Address.from(11); + + const data = try std.testing.allocator.alloc(u8, recv_buffer_size * 10); + defer std.testing.allocator.free(data); + try channel.send(data, addrA); + + var unequal = false; + for (0..10) |_| { + const recv = (try channel.recv(addrA)).?; + defer recv.deinit(); + unequal = !std.mem.eql(u8, data[0..recv_buffer_size], recv.inner); + if (unequal) { + break; + } + } + + try std.testing.expect(unequal); +} diff --git a/shared/layer4.zig b/shared/layer4.zig new file mode 100644 index 0000000..8531d98 --- /dev/null +++ b/shared/layer4.zig @@ -0,0 +1,281 @@ +const std = @import("std"); + +const layer3 = @import("layer3.zig"); + +const shared = @import("main.zig"); +const Owned = shared.Owned; +const toOwned = shared.toOwned; + +const Flags = packed struct { + is_ack: bool, + _: u7 = 0, +}; + +// Simple packet structure with checksum +pub const Packet = extern struct { + seq_num: u32 align(1), + ack_num: u32 align(1), + flags: Flags align(1), + checksum: u32 align(1), + data_len: u8 align(1), + data: [114]u8 align(1), + + pub fn init() Packet { + return .{ + .seq_num = 0, + .ack_num = 0, + .flags = .{ .is_ack = false }, + .checksum = 0, + .data = [_]u8{0} ** 114, + .data_len = 0, + }; + } + + pub fn calculateChecksum(self: *Packet) void { + self.checksum = 0; + const bytes = std.mem.asBytes(self); + var hasher = std.hash.Crc32.init(); + hasher.update(bytes); + self.checksum = hasher.final(); + } + + pub fn verifyChecksum(self: *const Packet) bool { + var temp_packet = self.*; + temp_packet.checksum = 0; + const bytes = std.mem.asBytes(&temp_packet); + var hasher = std.hash.Crc32.init(); + hasher.update(bytes); + return hasher.final() == self.checksum; + } +}; + +pub fn Connection(comptime ChannelImpl: type) type { + return struct { + const Self = @This(); + + channel: layer3.ChannelInner(ChannelImpl), + address: layer3.Address, + + next_seq_num: u32, + expected_seq_num: u32, + unacked_packets: std.ArrayList(UnackedPacket), + allocator: std.mem.Allocator, + buffer: std.ArrayList(u8), + recvbuffer: std.ArrayList(u8), + + const UnackedPacket = struct { + packet: Packet, + last_sent: i64, + retries: u32, + }; + + pub fn init(allocator: std.mem.Allocator, channel: ChannelImpl, address: layer3.Address) Self { + return .{ + .channel = layer3.Channel(channel), + .address = address, + + .next_seq_num = 0, + .expected_seq_num = 0, + .allocator = allocator, + .unacked_packets = std.ArrayList(UnackedPacket).init(allocator), + .buffer = std.ArrayList(u8).init(allocator), + .recvbuffer = std.ArrayList(u8).init(allocator), + }; + } + + pub fn deinit(self: *Self) void { + self.unacked_packets.deinit(); + self.buffer.deinit(); + self.recvbuffer.deinit(); + } + + pub fn send(self: *Self, data: anytype) !void { + const bytes = std.mem.toBytes(data); + try self.sendBytes(&bytes); + } + + fn sendBytes(self: *Self, data: []const u8) !void { + var packet = Packet.init(); + packet.seq_num = self.next_seq_num; + packet.data_len = @intCast(data.len); + @memcpy(packet.data[0..data.len], data); + + try self.unacked_packets.append(.{ + .packet = packet, + .last_sent = std.time.milliTimestamp(), + .retries = 0, + }); + + self.next_seq_num += 1; + try self.sendPacket(&packet); + } + + fn sendAck(self: *Self, seq_num: u32) !void { + var ack_packet = Packet.init(); + ack_packet.flags.is_ack = true; + ack_packet.ack_num = seq_num; + + try self.sendPacket(&ack_packet); + } + + fn sendPacket(self: *Self, packet: *Packet) !void { + packet.calculateChecksum(); + const bytes = std.mem.asBytes(packet); + try self.channel.send(bytes, self.address); + + std.debug.print("Sending packet: seq={}, ack={}, is_ack={}\n", .{ + packet.seq_num, + packet.ack_num, + packet.flags.is_ack, + }); + } + + pub fn recv(self: *Self, comptime T: type) !?Owned(*T) { + if (try self.channel.recv(self.address)) |data| { + errdefer data.deinit(); + try self.buffer.appendSlice(data.inner); + data.deinit(); + if (self.buffer.items.len >= @sizeOf(Packet)) { + const packet = self.buffer.items[0..@sizeOf(Packet)]; + const packet_pointer: *Packet = @ptrCast(packet.ptr); + var packet_struct = packet_pointer.*; + + var newBuffer = std.ArrayList(u8).init(self.allocator); + try newBuffer.appendSlice(self.buffer.items[@sizeOf(Packet)..]); + self.buffer.deinit(); + self.buffer = newBuffer; + + if (packet_struct.verifyChecksum()) { + if (try self.recvPacket(&packet_struct)) |packet_data| { + try self.recvbuffer.appendSlice(packet_data); + if (self.recvbuffer.items.len >= @sizeOf(T)) { + const t: *T = @ptrCast(self.recvbuffer.items[0..@sizeOf(T)].ptr); + const result = try self.allocator.create(T); + errdefer self.allocator.destroy(result); + result.* = t.*; + + var newRecvBuffer = std.ArrayList(u8).init(self.allocator); + try newRecvBuffer.appendSlice(self.recvbuffer.items[@sizeOf(T)..]); + self.recvbuffer.deinit(); + self.recvbuffer = newRecvBuffer; + + return toOwned(result, self.allocator); + } + } + } + } + } + + return null; + } + + fn recvPacket(self: *Self, packet: *Packet) !?[]const u8 { + if (packet.flags.is_ack) { + const ack_num = packet.ack_num; + var i: usize = 0; + while (i < self.unacked_packets.items.len) : (i += 1) { + if (self.unacked_packets.items[i].packet.seq_num == ack_num) { + _ = self.unacked_packets.orderedRemove(i); + break; + } + } + return null; + } else if (packet.seq_num == self.expected_seq_num) { + try self.sendAck(packet.seq_num); + self.expected_seq_num += 1; + + return packet.data[0..packet.data_len]; + } + + return error.UnexpectedPacket; + } + + pub fn handleRetransmissions(self: *Self) !void { + const current_time = std.time.milliTimestamp(); + const timeout = 1000; + const max_retries = 5; + + var i: usize = 0; + while (i < self.unacked_packets.items.len) : (i += 1) { + var unacked = &self.unacked_packets.items[i]; + if (current_time - unacked.last_sent > timeout) { + if (unacked.retries >= max_retries) { + return error.MaxRetriesExceeded; + } + try self.sendPacket(&unacked.packet); + unacked.last_sent = current_time; + unacked.retries += 1; + } + } + } + }; +} + +const MyObject = extern struct { + hello: u32 align(1), +}; + +test "connection" { + std.debug.print("connection\n", .{}); + const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; + defer mock.deinit(); + const channel = layer3.Channel(mock); + + const addr = layer3.Address.from(11); + + const obj1 = MyObject{ .hello = 123 }; + var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn1.deinit(); + try conn1.send(obj1); + + var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn2.deinit(); + var obj2 = try conn2.recv(MyObject); + while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { + try conn1.handleRetransmissions(); + } + const cobj2 = obj2.?; + defer cobj2.deinit(); + try std.testing.expectEqualDeep(obj1, cobj2.inner.*); +} + +test "connection over unreliable channel" { + std.debug.print("connection over unreliable channel\n", .{}); + const mock = layer3.MockChannel.init(std.testing.allocator, 80, true) catch unreachable; + defer mock.deinit(); + const channel = layer3.Channel(mock); + + const addr = layer3.Address.from(11); + + const obj1 = MyObject{ .hello = 123 }; + var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn1.deinit(); + try conn1.send(obj1); + + var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn2.deinit(); + var obj2 = try conn2.recv(MyObject); + while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { + try conn1.handleRetransmissions(); + } + const cobj2 = obj2.?; + defer cobj2.deinit(); + try std.testing.expectEqualDeep(obj1, cobj2.inner.*); +} + +test "packet checksum" { + var packet = Packet.init(); + packet.calculateChecksum(); + try std.testing.expect(packet.verifyChecksum()); +} + +test "corrupted packet checksum" { + var packet = Packet.init(); + packet.calculateChecksum(); + packet.checksum += 1; + try std.testing.expect(!packet.verifyChecksum()); +} + +test "sizes" { + try std.testing.expectEqual(128, @sizeOf(Packet)); +} diff --git a/shared/main.zig b/shared/main.zig index 4061e18..4fb84f5 100644 --- a/shared/main.zig +++ b/shared/main.zig @@ -4,9 +4,11 @@ const std = @import("std"); pub const msdk = @import("msdk"); pub const layer3 = @import("layer3.zig"); +pub const layer4 = @import("layer4.zig"); comptime { _ = layer3; + _ = layer4; } /// Used to override Zig's default log function to work on the embedded, `freestanding` platform. Normally, Zig has a hard dependency on posix. @@ -49,12 +51,21 @@ pub fn Owned(comptime T: type) type { allocator: std.mem.Allocator, pub fn deinit(self: Self) void { - self.allocator.free(self.inner); + switch (@typeInfo(T)) { + .Array => self.allocator.free(self.inner), + .Vector => self.allocator.free(self.inner), + .Pointer => |info| switch (info.size) { + .One => self.allocator.destroy(self.inner), + .Many, .C, .Slice => self.allocator.free(self.inner), + }, + .Struct => self.inner.deinit(), + else => unreachable, + } } }; } -/// `inner` must have been allocated with `allocator.create` +/// `inner` must have been allocated with `allocator` pub fn toOwned(inner: anytype, allocator: std.mem.Allocator) Owned(@TypeOf(inner)) { return .{ .inner = inner, From 91f4a18c1fe1295888c5d4b6ad84725ba1c389ca Mon Sep 17 00:00:00 2001 From: jLevere Date: Fri, 15 Nov 2024 16:15:00 -0500 Subject: [PATCH 02/15] fragment send packets --- shared/layer4.zig | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index 8531d98..0733794 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -13,12 +13,13 @@ const Flags = packed struct { // Simple packet structure with checksum pub const Packet = extern struct { + const data_size = 114; seq_num: u32 align(1), ack_num: u32 align(1), flags: Flags align(1), checksum: u32 align(1), data_len: u8 align(1), - data: [114]u8 align(1), + data: [data_size]u8 align(1), pub fn init() Packet { return .{ @@ -26,7 +27,7 @@ pub const Packet = extern struct { .ack_num = 0, .flags = .{ .is_ack = false }, .checksum = 0, - .data = [_]u8{0} ** 114, + .data = [_]u8{0} ** data_size, .data_len = 0, }; } @@ -91,7 +92,10 @@ pub fn Connection(comptime ChannelImpl: type) type { pub fn send(self: *Self, data: anytype) !void { const bytes = std.mem.toBytes(data); - try self.sendBytes(&bytes); + var iter = std.mem.window(u8, &bytes, Packet.data_size, Packet.data_size); + while (iter.next()) |chunk| { + try self.sendBytes(chunk); + } } fn sendBytes(self: *Self, data: []const u8) !void { @@ -146,7 +150,8 @@ pub fn Connection(comptime ChannelImpl: type) type { self.buffer = newBuffer; if (packet_struct.verifyChecksum()) { - if (try self.recvPacket(&packet_struct)) |packet_data| { + const maybe_packet = self.recvPacket(&packet_struct) catch return null; + if (maybe_packet) |packet_data| { try self.recvbuffer.appendSlice(packet_data); if (self.recvbuffer.items.len >= @sizeOf(T)) { const t: *T = @ptrCast(self.recvbuffer.items[0..@sizeOf(T)].ptr); @@ -215,6 +220,10 @@ const MyObject = extern struct { hello: u32 align(1), }; +const BigObject = extern struct { + hello: [Packet.data_size + 1]u8 align(1), +}; + test "connection" { std.debug.print("connection\n", .{}); const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; @@ -263,6 +272,30 @@ test "connection over unreliable channel" { try std.testing.expectEqualDeep(obj1, cobj2.inner.*); } +test "big T" { + std.debug.print("connection\n", .{}); + const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; + defer mock.deinit(); + const channel = layer3.Channel(mock); + + const addr = layer3.Address.from(11); + + const obj1: BigObject = undefined; + var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn1.deinit(); + try conn1.send(obj1); + + var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn2.deinit(); + var obj2 = try conn2.recv(BigObject); + while (obj2 == null) : (obj2 = try conn2.recv(BigObject)) { + try conn1.handleRetransmissions(); + } + const cobj2 = obj2.?; + defer cobj2.deinit(); + try std.testing.expectEqualDeep(obj1, cobj2.inner.*); +} + test "packet checksum" { var packet = Packet.init(); packet.calculateChecksum(); From 6bf045c300b26c88b6de3e7cb88999f44f648072 Mon Sep 17 00:00:00 2001 From: jLevere Date: Sun, 17 Nov 2024 19:12:31 -0500 Subject: [PATCH 03/15] added thread safety to layer3 and started testing layer4 threaded --- shared/layer3.zig | 10 ++++++ shared/layer4.zig | 87 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/shared/layer3.zig b/shared/layer3.zig index 3b566b3..53b97d5 100644 --- a/shared/layer3.zig +++ b/shared/layer3.zig @@ -53,6 +53,7 @@ pub const MockChannel = struct { allocator: std.mem.Allocator, buffers: *std.AutoHashMap(Address, std.ArrayList(u8)), recv_buffer_size: usize, + mutex: *std.Thread.Mutex, unreliable: bool, n: *usize, @@ -61,6 +62,9 @@ pub const MockChannel = struct { const buffers = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8))); buffers.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator); + const mutex = try allocator.create(std.Thread.Mutex); + mutex.* = .{}; + const n = try allocator.create(usize); n.* = 0; @@ -70,6 +74,7 @@ pub const MockChannel = struct { .buffers = buffers, .unreliable = unreliable, .n = n, + .mutex = mutex, }; } @@ -82,12 +87,15 @@ pub const MockChannel = struct { self.buffers.deinit(); self.allocator.destroy(self.buffers); self.allocator.destroy(self.n); + self.allocator.destroy(self.mutex); } pub fn send(self: Self, data: []const u8, to: Address) ChannelError!void { // if (data.len > self.recv_buffer_size) { // return ChannelError.SendTooBig; // } + self.mutex.lock(); + defer self.mutex.unlock(); if (self.buffers.getPtr(to)) |buffer| { try buffer.appendSlice(data); } else { @@ -114,6 +122,8 @@ pub const MockChannel = struct { } pub fn recv(self: Self, from: Address) ChannelError!?Owned([]const u8) { + self.mutex.lock(); + defer self.mutex.unlock(); const buffer = self.buffers.get(from) orelse return null; const len = @min(self.recv_buffer_size, buffer.items.len); diff --git a/shared/layer4.zig b/shared/layer4.zig index 0733794..a98432b 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -96,6 +96,8 @@ pub fn Connection(comptime ChannelImpl: type) type { while (iter.next()) |chunk| { try self.sendBytes(chunk); } + + try self.handleRetransmissions(); } fn sendBytes(self: *Self, data: []const u8) !void { @@ -134,8 +136,8 @@ pub fn Connection(comptime ChannelImpl: type) type { }); } - pub fn recv(self: *Self, comptime T: type) !?Owned(*T) { - if (try self.channel.recv(self.address)) |data| { + pub fn recv(self: *Self, comptime T: type) !Owned(*T) { + while (try self.channel.recv(self.address)) |data| { errdefer data.deinit(); try self.buffer.appendSlice(data.inner); data.deinit(); @@ -144,15 +146,18 @@ pub fn Connection(comptime ChannelImpl: type) type { const packet_pointer: *Packet = @ptrCast(packet.ptr); var packet_struct = packet_pointer.*; + std.debug.print("recv -> {any}\n", .{self.buffer.items.len}); + var newBuffer = std.ArrayList(u8).init(self.allocator); try newBuffer.appendSlice(self.buffer.items[@sizeOf(Packet)..]); self.buffer.deinit(); self.buffer = newBuffer; if (packet_struct.verifyChecksum()) { - const maybe_packet = self.recvPacket(&packet_struct) catch return null; + const maybe_packet = self.recvPacket(&packet_struct) catch continue; if (maybe_packet) |packet_data| { try self.recvbuffer.appendSlice(packet_data); + std.debug.print("recv items len: {}\n", .{self.recvbuffer.items.len}); if (self.recvbuffer.items.len >= @sizeOf(T)) { const t: *T = @ptrCast(self.recvbuffer.items[0..@sizeOf(T)].ptr); const result = try self.allocator.create(T); @@ -163,15 +168,13 @@ pub fn Connection(comptime ChannelImpl: type) type { try newRecvBuffer.appendSlice(self.recvbuffer.items[@sizeOf(T)..]); self.recvbuffer.deinit(); self.recvbuffer = newRecvBuffer; - return toOwned(result, self.allocator); } } } } } - - return null; + return error.RecvFailed; } fn recvPacket(self: *Self, packet: *Packet) !?[]const u8 { @@ -196,20 +199,23 @@ pub fn Connection(comptime ChannelImpl: type) type { } pub fn handleRetransmissions(self: *Self) !void { - const current_time = std.time.milliTimestamp(); const timeout = 1000; const max_retries = 5; - var i: usize = 0; - while (i < self.unacked_packets.items.len) : (i += 1) { - var unacked = &self.unacked_packets.items[i]; - if (current_time - unacked.last_sent > timeout) { - if (unacked.retries >= max_retries) { - return error.MaxRetriesExceeded; + while (self.unacked_packets.items.len > 0) { + const current_time = std.time.milliTimestamp(); + var i: usize = 0; + while (i < self.unacked_packets.items.len) : (i += 1) { + var unacked = &self.unacked_packets.items[i]; + + if (current_time - unacked.last_sent > timeout) { + if (unacked.retries >= max_retries) { + return error.MaxRetriesExceeded; + } + try self.sendPacket(&unacked.packet); + unacked.last_sent = current_time; + unacked.retries += 1; } - try self.sendPacket(&unacked.packet); - unacked.last_sent = current_time; - unacked.retries += 1; } } } @@ -224,6 +230,10 @@ const BigObject = extern struct { hello: [Packet.data_size + 1]u8 align(1), }; +pub fn send_thread_shim(data: anytype) void { + data.conn.send(data.obj) catch @panic("cant send data"); +} + test "connection" { std.debug.print("connection\n", .{}); const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; @@ -232,6 +242,37 @@ test "connection" { const addr = layer3.Address.from(11); + const obj1 = MyObject{ .hello = 123 }; + var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn1.deinit(); + + const ThreadData = struct { + conn: Connection(@TypeOf(channel)), + obj: MyObject, + }; + const thread_data = try std.testing.allocator.create(ThreadData); + defer std.testing.allocator.destroy(thread_data); + thread_data.* = .{ .conn = conn1, .obj = obj1 }; + const thread = try std.Thread.spawn(.{}, send_thread_shim, .{thread_data}); + defer thread.join(); + + var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + defer conn2.deinit(); + + const obj2 = try conn2.recv(MyObject); + defer obj2.deinit(); + + try std.testing.expectEqualDeep(obj1, obj2.inner.*); +} + +test "connection over unreliable channel" { + std.debug.print("connection over unreliable channel\n", .{}); + const mock = layer3.MockChannel.init(std.testing.allocator, 80, true) catch unreachable; + defer mock.deinit(); + const channel = layer3.Channel(mock); + + const addr = layer3.Address.from(11); + const obj1 = MyObject{ .hello = 123 }; var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); defer conn1.deinit(); @@ -248,23 +289,23 @@ test "connection" { try std.testing.expectEqualDeep(obj1, cobj2.inner.*); } -test "connection over unreliable channel" { - std.debug.print("connection over unreliable channel\n", .{}); - const mock = layer3.MockChannel.init(std.testing.allocator, 80, true) catch unreachable; +test "big T" { + std.debug.print("connection\n", .{}); + const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); const channel = layer3.Channel(mock); const addr = layer3.Address.from(11); - const obj1 = MyObject{ .hello = 123 }; + const obj1: BigObject = undefined; var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); defer conn1.deinit(); try conn1.send(obj1); var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); defer conn2.deinit(); - var obj2 = try conn2.recv(MyObject); - while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { + var obj2 = try conn2.recv(BigObject); + while (obj2 == null) : (obj2 = try conn2.recv(BigObject)) { try conn1.handleRetransmissions(); } const cobj2 = obj2.?; @@ -272,7 +313,7 @@ test "connection over unreliable channel" { try std.testing.expectEqualDeep(obj1, cobj2.inner.*); } -test "big T" { +test "out of order recv" { std.debug.print("connection\n", .{}); const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; defer mock.deinit(); From b00c26bbdb921dc8e4f1adc3b6369d190eeedfe3 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Sun, 17 Nov 2024 23:30:55 -0500 Subject: [PATCH 04/15] rework --- shared/layer3.zig | 177 +++++++++++++------ shared/layer4.zig | 424 +++++++++++++++++++++++----------------------- 2 files changed, 331 insertions(+), 270 deletions(-) diff --git a/shared/layer3.zig b/shared/layer3.zig index 53b97d5..0fcb224 100644 --- a/shared/layer3.zig +++ b/shared/layer3.zig @@ -46,63 +46,116 @@ pub inline fn Channel(inner: anytype) ChannelInner(@TypeOf(inner)) { return .{ .inner = inner }; } -/// A mock implementation of the `Channel` interface pub const MockChannel = struct { const Self = @This(); allocator: std.mem.Allocator, - buffers: *std.AutoHashMap(Address, std.ArrayList(u8)), - recv_buffer_size: usize, + sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), + recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), mutex: *std.Thread.Mutex, - unreliable: bool, - n: *usize, + from: MockChannelSimplex, + to: MockChannelSimplex, pub fn init(allocator: std.mem.Allocator, recv_buffer_size: usize, unreliable: bool) !Self { - const buffers = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8))); - buffers.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator); + const sendBuffer = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8))); + sendBuffer.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator); + + const recvBuffer = try allocator.create(std.AutoHashMap(Address, std.ArrayList(u8))); + recvBuffer.* = std.AutoHashMap(Address, std.ArrayList(u8)).init(allocator); const mutex = try allocator.create(std.Thread.Mutex); mutex.* = .{}; - const n = try allocator.create(usize); - n.* = 0; + const to = try MockChannelSimplex.init(allocator, recv_buffer_size, unreliable, sendBuffer, recvBuffer, mutex); + const from = try MockChannelSimplex.init(allocator, recv_buffer_size, unreliable, recvBuffer, sendBuffer, mutex); return Self{ - .recv_buffer_size = recv_buffer_size, + .to = to, + .from = from, .allocator = allocator, - .buffers = buffers, - .unreliable = unreliable, - .n = n, + .sendBuffer = sendBuffer, + .recvBuffer = recvBuffer, .mutex = mutex, }; } pub fn deinit(self: Self) void { - var iter = self.buffers.valueIterator(); + var iter = self.sendBuffer.valueIterator(); while (iter.next()) |buffer| { buffer.deinit(); } - self.buffers.deinit(); - self.allocator.destroy(self.buffers); - self.allocator.destroy(self.n); + self.sendBuffer.deinit(); + self.allocator.destroy(self.sendBuffer); + + iter = self.recvBuffer.valueIterator(); + while (iter.next()) |buffer| { + buffer.deinit(); + } + + self.recvBuffer.deinit(); + self.allocator.destroy(self.recvBuffer); + + self.to.deinit(); + self.from.deinit(); self.allocator.destroy(self.mutex); } +}; + +/// A mock implementation of the `Channel` interface +const MockChannelSimplex = struct { + const Self = @This(); + + allocator: std.mem.Allocator, + sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), + recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), + mutex: *std.Thread.Mutex, + recv_buffer_size: usize, + + unreliable: bool, + n: *usize, + + pub fn init( + allocator: std.mem.Allocator, + recv_buffer_size: usize, + unreliable: bool, + sendBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), + recvBuffer: *std.AutoHashMap(Address, std.ArrayList(u8)), + mutex: *std.Thread.Mutex, + ) !Self { + const n = try allocator.create(usize); + n.* = 0; + + return Self{ + .recv_buffer_size = recv_buffer_size, + .allocator = allocator, + .sendBuffer = sendBuffer, + .recvBuffer = recvBuffer, + .unreliable = unreliable, + .mutex = mutex, + .n = n, + }; + } + + pub fn deinit(self: Self) void { + self.allocator.destroy(self.n); + } pub fn send(self: Self, data: []const u8, to: Address) ChannelError!void { + self.mutex.lock(); + defer self.mutex.unlock(); + // if (data.len > self.recv_buffer_size) { // return ChannelError.SendTooBig; // } - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.buffers.getPtr(to)) |buffer| { + if (self.sendBuffer.getPtr(to)) |buffer| { try buffer.appendSlice(data); } else { var buffer = std.ArrayList(u8).init(self.allocator); errdefer buffer.deinit(); try buffer.appendSlice(data); - try self.buffers.put(to, buffer); + try self.sendBuffer.put(to, buffer); } self.messWithData(); @@ -110,7 +163,7 @@ pub const MockChannel = struct { fn messWithData(self: Self) void { if (self.unreliable) { - var iter = self.buffers.valueIterator(); + var iter = self.sendBuffer.valueIterator(); while (iter.next()) |buffer| { if (self.n.* % 2 == 0) { buffer.items[buffer.items.len / 2] += 7; @@ -124,7 +177,8 @@ pub const MockChannel = struct { pub fn recv(self: Self, from: Address) ChannelError!?Owned([]const u8) { self.mutex.lock(); defer self.mutex.unlock(); - const buffer = self.buffers.get(from) orelse return null; + + const buffer = self.recvBuffer.get(from) orelse return null; const len = @min(self.recv_buffer_size, buffer.items.len); const recv_buffer = try self.allocator.dupe(u8, buffer.items[0..len]); @@ -132,13 +186,13 @@ pub const MockChannel = struct { if (buffer.items.len < self.recv_buffer_size) { buffer.deinit(); - const removed = self.buffers.remove(from); + const removed = self.recvBuffer.remove(from); std.debug.assert(removed); } else { var next_list = std.ArrayList(u8).init(self.allocator); try next_list.appendSlice(buffer.items[self.recv_buffer_size..]); buffer.deinit(); - try self.buffers.put(from, next_list); + try self.recvBuffer.put(from, next_list); } return toOwned(@as([]const u8, recv_buffer), self.allocator); @@ -146,102 +200,113 @@ pub const MockChannel = struct { }; test "basic channel" { - const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const toChannel = Channel(mocks.to); + const fromChannel = Channel(mocks.from); const addrA = Address.from(11); const data = "hello, world!"; - try channel.send(data, addrA); + try toChannel.send(data, addrA); - const recv = (try channel.recv(addrA)).?; + const recv = (try fromChannel.recv(addrA)).?; defer recv.deinit(); + try std.testing.expectEqualSlices(u8, data, recv.inner); } test "channel large buffer" { const recv_buffer_size = 80; - const mock = try MockChannel.init(std.testing.allocator, recv_buffer_size, false); - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, recv_buffer_size, false); + defer mocks.deinit(); + + const toChannel = Channel(mocks.to); + const fromChannel = Channel(mocks.from); const addrA = Address.from(11); const data = try std.testing.allocator.alloc(u8, recv_buffer_size + 1); defer std.testing.allocator.free(data); - try channel.send(data, addrA); + try toChannel.send(data, addrA); - const recv = (try channel.recv(addrA)).?; + const recv = (try fromChannel.recv(addrA)).?; defer recv.deinit(); try std.testing.expectEqualSlices(u8, data[0..recv_buffer_size], recv.inner); - const recv2 = (try channel.recv(addrA)).?; + const recv2 = (try fromChannel.recv(addrA)).?; defer recv2.deinit(); try std.testing.expectEqualSlices(u8, data[recv_buffer_size .. recv_buffer_size + 1], recv2.inner); } test "channel multiple addresses" { - const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const toChannel = Channel(mocks.to); + const fromChannel = Channel(mocks.from); const addrA = Address.from(11); const addrB = Address.from(12); const dataA = "data A"; - try channel.send(dataA, addrA); + try toChannel.send(dataA, addrA); const dataB = "data B"; - try channel.send(dataB, addrB); + try toChannel.send(dataB, addrB); - const recvB = (try channel.recv(addrB)).?; + const recvB = (try fromChannel.recv(addrB)).?; defer recvB.deinit(); try std.testing.expectEqualSlices(u8, dataB, recvB.inner); - const recvA = (try channel.recv(addrA)).?; + const recvA = (try fromChannel.recv(addrA)).?; defer recvA.deinit(); try std.testing.expectEqualSlices(u8, dataA, recvA.inner); } test "channel recv nothing" { - const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const fromChannel = Channel(mocks.from); const addr = Address.from(11); - const recv = try channel.recv(addr); + const recv = try fromChannel.recv(addr); try std.testing.expectEqual(recv, null); } test "channel send nothing" { - const mock = MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const toChannel = Channel(mocks.to); const addr = Address.from(11); const data = ([_]u8{})[0..0]; try std.testing.expectEqual(0, data.len); - try channel.send(data, addr); + try toChannel.send(data, addr); } test "unreliable channel" { const recv_buffer_size = 80; - const mock = MockChannel.init(std.testing.allocator, recv_buffer_size, true) catch unreachable; - defer mock.deinit(); - const channel = Channel(mock); + const mocks = try MockChannel.init(std.testing.allocator, recv_buffer_size, true); + defer mocks.deinit(); + + const toChannel = Channel(mocks.to); + const fromChannel = Channel(mocks.from); const addrA = Address.from(11); const data = try std.testing.allocator.alloc(u8, recv_buffer_size * 10); defer std.testing.allocator.free(data); - try channel.send(data, addrA); + try toChannel.send(data, addrA); var unequal = false; for (0..10) |_| { - const recv = (try channel.recv(addrA)).?; + const recv = (try fromChannel.recv(addrA)).?; defer recv.deinit(); unequal = !std.mem.eql(u8, data[0..recv_buffer_size], recv.inner); if (unequal) { diff --git a/shared/layer4.zig b/shared/layer4.zig index a98432b..909058b 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -53,75 +53,156 @@ pub const Packet = extern struct { pub fn Connection(comptime ChannelImpl: type) type { return struct { const Self = @This(); + const num_unacked_packets = 10; + const num_retries = 5; channel: layer3.ChannelInner(ChannelImpl), address: layer3.Address, - next_seq_num: u32, - expected_seq_num: u32, - unacked_packets: std.ArrayList(UnackedPacket), allocator: std.mem.Allocator, - buffer: std.ArrayList(u8), - recvbuffer: std.ArrayList(u8), + seq_num: u32 = 0, + + unacked_packets: [num_unacked_packets]UnackedPacket = undefined, + unacked_packet_tail: usize = 0, + unacked_packet_head: usize = 0, + + buffer: [@sizeOf(Packet)]u8 = undefined, + buffer_size: usize = 0, const UnackedPacket = struct { packet: Packet, last_sent: i64, - retries: u32, + retries: u32 = 0, }; pub fn init(allocator: std.mem.Allocator, channel: ChannelImpl, address: layer3.Address) Self { return .{ .channel = layer3.Channel(channel), .address = address, - - .next_seq_num = 0, - .expected_seq_num = 0, .allocator = allocator, - .unacked_packets = std.ArrayList(UnackedPacket).init(allocator), - .buffer = std.ArrayList(u8).init(allocator), - .recvbuffer = std.ArrayList(u8).init(allocator), }; } - pub fn deinit(self: *Self) void { - self.unacked_packets.deinit(); - self.buffer.deinit(); - self.recvbuffer.deinit(); - } - pub fn send(self: *Self, data: anytype) !void { const bytes = std.mem.toBytes(data); var iter = std.mem.window(u8, &bytes, Packet.data_size, Packet.data_size); - while (iter.next()) |chunk| { - try self.sendBytes(chunk); - } - try self.handleRetransmissions(); - } + var timeout: i64 = 0; + while (true) { + const maybe_chunk = iter.next(); + if (maybe_chunk == null and timeout == 0) { + // if we ware done sending, start a timeout + timeout = std.time.milliTimestamp(); + } + // std.debug.print("self.unacked_packet_head - self.unacked_packet_tail = {}\n", .{self.unacked_packet_head - self.unacked_packet_tail}); + std.debug.print("self.unacked_packet_head = {}, self.unacked_packet_tail = {}\n", .{ self.unacked_packet_head, self.unacked_packet_tail }); + if (self.unacked_packet_head - self.unacked_packet_tail <= num_unacked_packets) { + // only send a chunk if we have room in our unacked packets + if (maybe_chunk) |chunk| { + // try self.sendBytes(chunk); + var packet = Packet.init(); + packet.seq_num = self.seq_num; + packet.data_len = @intCast(chunk.len); + @memcpy(packet.data[0..chunk.len], chunk); + + self.unacked_packet_head += 1; + self.unacked_packets[@mod(self.unacked_packet_head, num_unacked_packets)] = .{ + .packet = packet, + .last_sent = std.time.milliTimestamp(), + }; + + self.seq_num += 1; + try self.sendPacket(&packet); + } + } else if (timeout == 0) { + // receiver might have disconnected + timeout = std.time.milliTimestamp(); + } + + if (try self.recvPacket()) |packet| { + if (!packet.flags.is_ack) return error.UnexpectedPacket; + + // find the packet to ack + var pos: usize = self.unacked_packet_tail; + while (pos < self.unacked_packet_head) : (pos += 1) { + const i = @mod(pos, num_unacked_packets); + if (self.unacked_packets[i].packet.seq_num == packet.ack_num) { + self.unacked_packets[i] = self.unacked_packets[@mod(self.unacked_packet_tail, num_unacked_packets)]; + self.unacked_packet_tail += 1; + break; + } + } + } - fn sendBytes(self: *Self, data: []const u8) !void { - var packet = Packet.init(); - packet.seq_num = self.next_seq_num; - packet.data_len = @intCast(data.len); - @memcpy(packet.data[0..data.len], data); + const current_time = std.time.milliTimestamp(); - try self.unacked_packets.append(.{ - .packet = packet, - .last_sent = std.time.milliTimestamp(), - .retries = 0, - }); + if (timeout != 0 and current_time - timeout > 1000) { + return; + } - self.next_seq_num += 1; - try self.sendPacket(&packet); + var pos: usize = self.unacked_packet_tail; + while (pos < self.unacked_packet_head) : (pos += 1) { + const i = @mod(pos, num_unacked_packets); + const unacked = &self.unacked_packets[i]; + if (current_time - unacked.last_sent > timeout) { + if (unacked.retries >= num_retries) { + return error.MaxRetriesExceeded; + } + try self.sendPacket(&unacked.packet); + unacked.last_sent = current_time; + unacked.retries += 1; + } + } + + if (maybe_chunk == null and self.unacked_packet_head == self.unacked_packet_tail) { + return; + } + } } - fn sendAck(self: *Self, seq_num: u32) !void { - var ack_packet = Packet.init(); - ack_packet.flags.is_ack = true; - ack_packet.ack_num = seq_num; + pub fn recv(self: *Self, comptime T: type) !Owned(*T) { + const result = try self.allocator.create(T); + errdefer self.allocator.destroy(result); + const result_bytes = std.mem.asBytes(result); + var remaining: usize = @sizeOf(T) / Packet.data_size; + if (@mod(@sizeOf(T), Packet.data_size) != 0) remaining += 1; + + var timeout = std.time.milliTimestamp(); + + while (true) { + std.time.sleep(1 * 1000); + const currentTime = std.time.milliTimestamp(); + if (try self.recvPacket()) |packet| { + if (packet.flags.is_ack) return error.UnexpectedPacket; + + var ack_packet = Packet.init(); + ack_packet.flags.is_ack = true; + ack_packet.ack_num = packet.seq_num; + try self.sendPacket(&ack_packet); + + const offset = packet.seq_num * Packet.data_size; + const len = @min(Packet.data_size, packet.data_len); + if (offset + len > @sizeOf(T)) { + return error.StopTheCount; + } + + @memcpy(result_bytes[offset .. offset + len], packet.data[0..len]); + remaining -= 1; + // std.debug.print("remaining {}\n", .{remaining}); + + if (remaining == 0) { + return toOwned(result, self.allocator); + } - try self.sendPacket(&ack_packet); + timeout = currentTime; + } + + if (currentTime - timeout > 10 * 1000) { + // std.debug.print("currentTime {}\n", .{currentTime}); + // std.debug.print("timeout {}\n", .{timeout}); + return error.Timeout; + } + } } fn sendPacket(self: *Self, packet: *Packet) !void { @@ -136,88 +217,35 @@ pub fn Connection(comptime ChannelImpl: type) type { }); } - pub fn recv(self: *Self, comptime T: type) !Owned(*T) { - while (try self.channel.recv(self.address)) |data| { - errdefer data.deinit(); - try self.buffer.appendSlice(data.inner); - data.deinit(); - if (self.buffer.items.len >= @sizeOf(Packet)) { - const packet = self.buffer.items[0..@sizeOf(Packet)]; - const packet_pointer: *Packet = @ptrCast(packet.ptr); - var packet_struct = packet_pointer.*; - - std.debug.print("recv -> {any}\n", .{self.buffer.items.len}); - - var newBuffer = std.ArrayList(u8).init(self.allocator); - try newBuffer.appendSlice(self.buffer.items[@sizeOf(Packet)..]); - self.buffer.deinit(); - self.buffer = newBuffer; - - if (packet_struct.verifyChecksum()) { - const maybe_packet = self.recvPacket(&packet_struct) catch continue; - if (maybe_packet) |packet_data| { - try self.recvbuffer.appendSlice(packet_data); - std.debug.print("recv items len: {}\n", .{self.recvbuffer.items.len}); - if (self.recvbuffer.items.len >= @sizeOf(T)) { - const t: *T = @ptrCast(self.recvbuffer.items[0..@sizeOf(T)].ptr); - const result = try self.allocator.create(T); - errdefer self.allocator.destroy(result); - result.* = t.*; - - var newRecvBuffer = std.ArrayList(u8).init(self.allocator); - try newRecvBuffer.appendSlice(self.recvbuffer.items[@sizeOf(T)..]); - self.recvbuffer.deinit(); - self.recvbuffer = newRecvBuffer; - return toOwned(result, self.allocator); - } - } - } + fn recvPacket(self: *Self) !?Packet { + if (try self.channel.recv(self.address)) |data| { + std.debug.assert(data.inner.len <= @sizeOf(Packet)); + defer data.deinit(); + + const remaining_bytes = @sizeOf(Packet) - self.buffer_size; + // std.debug.print("remaining_bytes {}\n", .{remaining_bytes}); + // std.debug.print("data.inner.len < remaining_bytes {}\n", .{data.inner.len < remaining_bytes}); + if (data.inner.len < remaining_bytes) { + @memcpy(self.buffer[self.buffer_size .. self.buffer_size + data.inner.len], data.inner); + self.buffer_size += data.inner.len; + return null; } - } - return error.RecvFailed; - } - fn recvPacket(self: *Self, packet: *Packet) !?[]const u8 { - if (packet.flags.is_ack) { - const ack_num = packet.ack_num; - var i: usize = 0; - while (i < self.unacked_packets.items.len) : (i += 1) { - if (self.unacked_packets.items[i].packet.seq_num == ack_num) { - _ = self.unacked_packets.orderedRemove(i); - break; - } - } - return null; - } else if (packet.seq_num == self.expected_seq_num) { - try self.sendAck(packet.seq_num); - self.expected_seq_num += 1; - - return packet.data[0..packet.data_len]; - } + @memcpy(self.buffer[self.buffer_size..@sizeOf(Packet)], data.inner[0..remaining_bytes]); + const packet_ptr: *Packet = @ptrCast(&self.buffer[0]); + const packet = packet_ptr.*; - return error.UnexpectedPacket; - } - - pub fn handleRetransmissions(self: *Self) !void { - const timeout = 1000; - const max_retries = 5; + // std.debug.print("data.inner.len {}\n", .{data.inner.len}); + // std.debug.print("remaining_bytes, data.inner.len: {} {}\n", .{ remaining_bytes, data.inner.len }); + self.buffer_size = 0; + @memcpy(self.buffer[0 .. data.inner.len - remaining_bytes], data.inner[remaining_bytes..]); - while (self.unacked_packets.items.len > 0) { - const current_time = std.time.milliTimestamp(); - var i: usize = 0; - while (i < self.unacked_packets.items.len) : (i += 1) { - var unacked = &self.unacked_packets.items[i]; + if (!packet.verifyChecksum()) return null; + // std.debug.print("verified checksum\n", .{}); - if (current_time - unacked.last_sent > timeout) { - if (unacked.retries >= max_retries) { - return error.MaxRetriesExceeded; - } - try self.sendPacket(&unacked.packet); - unacked.last_sent = current_time; - unacked.retries += 1; - } - } + return packet; } + return null; } }; } @@ -227,116 +255,84 @@ const MyObject = extern struct { }; const BigObject = extern struct { - hello: [Packet.data_size + 1]u8 align(1), + hello: [Packet.data_size * 100 + 1]u8 align(1), }; -pub fn send_thread_shim(data: anytype) void { - data.conn.send(data.obj) catch @panic("cant send data"); -} - -test "connection" { - std.debug.print("connection\n", .{}); - const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = layer3.Channel(mock); +// test "connection" { +// std.debug.print("connection\n", .{}); +// const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); +// defer mocks.deinit(); + +// const addr = layer3.Address.from(11); + +// const channel1 = layer3.Channel(mocks.to); +// const obj1 = MyObject{ .hello = 123 }; +// var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); + +// const thread = try std.Thread.spawn(.{}, struct { +// fn run(conn: *@TypeOf(conn1)) void { +// conn.send(obj1) catch @panic("a"); +// } +// }.run, .{&conn1}); +// defer thread.join(); + +// const channel2 = layer3.Channel(mocks.from); +// var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); +// const obj2 = try conn2.recv(@TypeOf(obj1)); +// defer obj2.deinit(); +// try std.testing.expectEqualDeep(obj1, obj2.inner.*); +// } + +// test "connection over unreliable channel" { +// std.debug.print("connection over unreliable channel\n", .{}); +// const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, true); +// defer mocks.deinit(); + +// const addr = layer3.Address.from(11); + +// const channel1 = layer3.Channel(mocks.to); +// const obj1 = MyObject{ .hello = 123 }; +// var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); +// defer conn1.deinit(); +// try conn1.send(obj1); + +// const channel2 = layer3.Channel(mocks.from); +// var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); +// defer conn2.deinit(); +// var obj2 = try conn2.recv(MyObject); +// while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { +// try conn1.handleRetransmissions(); +// } +// const cobj2 = obj2.?; +// defer cobj2.deinit(); +// try std.testing.expectEqualDeep(obj1, cobj2.inner.*); +// } + +test "connection over big T" { + std.debug.print("connection over big T\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); const addr = layer3.Address.from(11); - const obj1 = MyObject{ .hello = 123 }; - var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn1.deinit(); + const channel1 = layer3.Channel(mocks.to); + const obj1: BigObject = undefined; + var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); - const ThreadData = struct { - conn: Connection(@TypeOf(channel)), - obj: MyObject, - }; - const thread_data = try std.testing.allocator.create(ThreadData); - defer std.testing.allocator.destroy(thread_data); - thread_data.* = .{ .conn = conn1, .obj = obj1 }; - const thread = try std.Thread.spawn(.{}, send_thread_shim, .{thread_data}); + const thread = try std.Thread.spawn(.{}, struct { + fn run(conn: *@TypeOf(conn1)) void { + conn.send(obj1) catch @panic("a"); + } + }.run, .{&conn1}); defer thread.join(); - var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn2.deinit(); - - const obj2 = try conn2.recv(MyObject); + const channel2 = layer3.Channel(mocks.from); + var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); + const obj2 = try conn2.recv(@TypeOf(obj1)); defer obj2.deinit(); - try std.testing.expectEqualDeep(obj1, obj2.inner.*); } -test "connection over unreliable channel" { - std.debug.print("connection over unreliable channel\n", .{}); - const mock = layer3.MockChannel.init(std.testing.allocator, 80, true) catch unreachable; - defer mock.deinit(); - const channel = layer3.Channel(mock); - - const addr = layer3.Address.from(11); - - const obj1 = MyObject{ .hello = 123 }; - var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn1.deinit(); - try conn1.send(obj1); - - var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn2.deinit(); - var obj2 = try conn2.recv(MyObject); - while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { - try conn1.handleRetransmissions(); - } - const cobj2 = obj2.?; - defer cobj2.deinit(); - try std.testing.expectEqualDeep(obj1, cobj2.inner.*); -} - -test "big T" { - std.debug.print("connection\n", .{}); - const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = layer3.Channel(mock); - - const addr = layer3.Address.from(11); - - const obj1: BigObject = undefined; - var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn1.deinit(); - try conn1.send(obj1); - - var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn2.deinit(); - var obj2 = try conn2.recv(BigObject); - while (obj2 == null) : (obj2 = try conn2.recv(BigObject)) { - try conn1.handleRetransmissions(); - } - const cobj2 = obj2.?; - defer cobj2.deinit(); - try std.testing.expectEqualDeep(obj1, cobj2.inner.*); -} - -test "out of order recv" { - std.debug.print("connection\n", .{}); - const mock = layer3.MockChannel.init(std.testing.allocator, 80, false) catch unreachable; - defer mock.deinit(); - const channel = layer3.Channel(mock); - - const addr = layer3.Address.from(11); - - const obj1: BigObject = undefined; - var conn1 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn1.deinit(); - try conn1.send(obj1); - - var conn2 = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); - defer conn2.deinit(); - var obj2 = try conn2.recv(BigObject); - while (obj2 == null) : (obj2 = try conn2.recv(BigObject)) { - try conn1.handleRetransmissions(); - } - const cobj2 = obj2.?; - defer cobj2.deinit(); - try std.testing.expectEqualDeep(obj1, cobj2.inner.*); -} - test "packet checksum" { var packet = Packet.init(); packet.calculateChecksum(); From 3394916669d1a04457deb5c0e54ef3d60e0522de Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 01:42:04 -0500 Subject: [PATCH 05/15] some fixes --- shared/layer4.zig | 96 +++++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index 909058b..b953d70 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -55,6 +55,7 @@ pub fn Connection(comptime ChannelImpl: type) type { const Self = @This(); const num_unacked_packets = 10; const num_retries = 5; + const global_timeout = 5 * 1000; channel: layer3.ChannelInner(ChannelImpl), address: layer3.Address, @@ -94,22 +95,19 @@ pub fn Connection(comptime ChannelImpl: type) type { // if we ware done sending, start a timeout timeout = std.time.milliTimestamp(); } - // std.debug.print("self.unacked_packet_head - self.unacked_packet_tail = {}\n", .{self.unacked_packet_head - self.unacked_packet_tail}); - std.debug.print("self.unacked_packet_head = {}, self.unacked_packet_tail = {}\n", .{ self.unacked_packet_head, self.unacked_packet_tail }); if (self.unacked_packet_head - self.unacked_packet_tail <= num_unacked_packets) { // only send a chunk if we have room in our unacked packets if (maybe_chunk) |chunk| { - // try self.sendBytes(chunk); var packet = Packet.init(); packet.seq_num = self.seq_num; packet.data_len = @intCast(chunk.len); @memcpy(packet.data[0..chunk.len], chunk); - self.unacked_packet_head += 1; self.unacked_packets[@mod(self.unacked_packet_head, num_unacked_packets)] = .{ .packet = packet, .last_sent = std.time.milliTimestamp(), }; + self.unacked_packet_head += 1; self.seq_num += 1; try self.sendPacket(&packet); @@ -129,6 +127,7 @@ pub fn Connection(comptime ChannelImpl: type) type { if (self.unacked_packets[i].packet.seq_num == packet.ack_num) { self.unacked_packets[i] = self.unacked_packets[@mod(self.unacked_packet_tail, num_unacked_packets)]; self.unacked_packet_tail += 1; + timeout = std.time.milliTimestamp(); break; } } @@ -136,7 +135,7 @@ pub fn Connection(comptime ChannelImpl: type) type { const current_time = std.time.milliTimestamp(); - if (timeout != 0 and current_time - timeout > 1000) { + if (timeout != 0 and current_time - timeout > global_timeout) { return; } @@ -144,7 +143,7 @@ pub fn Connection(comptime ChannelImpl: type) type { while (pos < self.unacked_packet_head) : (pos += 1) { const i = @mod(pos, num_unacked_packets); const unacked = &self.unacked_packets[i]; - if (current_time - unacked.last_sent > timeout) { + if (current_time - unacked.last_sent > 1000) { if (unacked.retries >= num_retries) { return error.MaxRetriesExceeded; } @@ -169,8 +168,7 @@ pub fn Connection(comptime ChannelImpl: type) type { var timeout = std.time.milliTimestamp(); - while (true) { - std.time.sleep(1 * 1000); + while (remaining > 0) { const currentTime = std.time.milliTimestamp(); if (try self.recvPacket()) |packet| { if (packet.flags.is_ack) return error.UnexpectedPacket; @@ -188,21 +186,16 @@ pub fn Connection(comptime ChannelImpl: type) type { @memcpy(result_bytes[offset .. offset + len], packet.data[0..len]); remaining -= 1; - // std.debug.print("remaining {}\n", .{remaining}); - - if (remaining == 0) { - return toOwned(result, self.allocator); - } timeout = currentTime; } - if (currentTime - timeout > 10 * 1000) { - // std.debug.print("currentTime {}\n", .{currentTime}); - // std.debug.print("timeout {}\n", .{timeout}); + if (currentTime - timeout > global_timeout) { return error.Timeout; } } + + return toOwned(result, self.allocator); } fn sendPacket(self: *Self, packet: *Packet) !void { @@ -223,8 +216,6 @@ pub fn Connection(comptime ChannelImpl: type) type { defer data.deinit(); const remaining_bytes = @sizeOf(Packet) - self.buffer_size; - // std.debug.print("remaining_bytes {}\n", .{remaining_bytes}); - // std.debug.print("data.inner.len < remaining_bytes {}\n", .{data.inner.len < remaining_bytes}); if (data.inner.len < remaining_bytes) { @memcpy(self.buffer[self.buffer_size .. self.buffer_size + data.inner.len], data.inner); self.buffer_size += data.inner.len; @@ -235,13 +226,10 @@ pub fn Connection(comptime ChannelImpl: type) type { const packet_ptr: *Packet = @ptrCast(&self.buffer[0]); const packet = packet_ptr.*; - // std.debug.print("data.inner.len {}\n", .{data.inner.len}); - // std.debug.print("remaining_bytes, data.inner.len: {} {}\n", .{ remaining_bytes, data.inner.len }); - self.buffer_size = 0; - @memcpy(self.buffer[0 .. data.inner.len - remaining_bytes], data.inner[remaining_bytes..]); + self.buffer_size = data.inner.len - remaining_bytes; + @memcpy(self.buffer[0..self.buffer_size], data.inner[remaining_bytes..]); if (!packet.verifyChecksum()) return null; - // std.debug.print("verified checksum\n", .{}); return packet; } @@ -250,38 +238,38 @@ pub fn Connection(comptime ChannelImpl: type) type { }; } -const MyObject = extern struct { - hello: u32 align(1), -}; - -const BigObject = extern struct { - hello: [Packet.data_size * 100 + 1]u8 align(1), -}; +fn initBytes(num_bytes: comptime_int) [num_bytes]u8 { + var result: [num_bytes]u8 = undefined; + for (&result, 0..) |*b, i| { + b.* = @truncate(i); + } + return result; +} -// test "connection" { -// std.debug.print("connection\n", .{}); -// const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); -// defer mocks.deinit(); +test "connection" { + std.debug.print("connection\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); -// const addr = layer3.Address.from(11); + const addr = layer3.Address.from(11); -// const channel1 = layer3.Channel(mocks.to); -// const obj1 = MyObject{ .hello = 123 }; -// var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); + const channel1 = layer3.Channel(mocks.to); + const obj1 = initBytes(1); + var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); -// const thread = try std.Thread.spawn(.{}, struct { -// fn run(conn: *@TypeOf(conn1)) void { -// conn.send(obj1) catch @panic("a"); -// } -// }.run, .{&conn1}); -// defer thread.join(); + const thread = try std.Thread.spawn(.{}, struct { + fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { + conn.send(obj.*) catch @panic("a"); + } + }.run, .{ &conn1, &obj1 }); + defer thread.join(); -// const channel2 = layer3.Channel(mocks.from); -// var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); -// const obj2 = try conn2.recv(@TypeOf(obj1)); -// defer obj2.deinit(); -// try std.testing.expectEqualDeep(obj1, obj2.inner.*); -// } + const channel2 = layer3.Channel(mocks.from); + var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); + const obj2 = try conn2.recv(@TypeOf(obj1)); + defer obj2.deinit(); + try std.testing.expectEqualDeep(obj1, obj2.inner.*); +} // test "connection over unreliable channel" { // std.debug.print("connection over unreliable channel\n", .{}); @@ -314,16 +302,16 @@ test "connection over big T" { defer mocks.deinit(); const addr = layer3.Address.from(11); + const obj1 = initBytes(Packet.data_size * 19 + 1); const channel1 = layer3.Channel(mocks.to); - const obj1: BigObject = undefined; var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); const thread = try std.Thread.spawn(.{}, struct { - fn run(conn: *@TypeOf(conn1)) void { - conn.send(obj1) catch @panic("a"); + fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { + conn.send(obj.*) catch @panic("a"); } - }.run, .{&conn1}); + }.run, .{ &conn1, &obj1 }); defer thread.join(); const channel2 = layer3.Channel(mocks.from); From 5fdb6c72eca5357db8f9b86661db708cf36c8ed1 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 14:08:31 -0500 Subject: [PATCH 06/15] fix everything --- .vscode/settings.json | 3 +- shared/layer3.zig | 4 +- shared/layer4.zig | 117 +++++++++++++++++++++++++----------------- 3 files changed, 72 insertions(+), 52 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 8436bc0..3426c1c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,5 @@ "zig.path": "zig", "zig.zls.path": "zls", "zig.zls.enableBuildOnSave": true, - "zig.zls.enableAutofix": true, - "zig.zls.buildOnSaveStep": "test" + "zig.zls.enableAutofix": true } diff --git a/shared/layer3.zig b/shared/layer3.zig index 0fcb224..b0a0ff2 100644 --- a/shared/layer3.zig +++ b/shared/layer3.zig @@ -165,8 +165,8 @@ const MockChannelSimplex = struct { if (self.unreliable) { var iter = self.sendBuffer.valueIterator(); while (iter.next()) |buffer| { - if (self.n.* % 2 == 0) { - buffer.items[buffer.items.len / 2] += 7; + if (self.n.* % 3 == 0) { + buffer.items[buffer.items.len / 2] /= 2; } } diff --git a/shared/layer4.zig b/shared/layer4.zig index b953d70..ce6e7ab 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -85,34 +85,43 @@ pub fn Connection(comptime ChannelImpl: type) type { } pub fn send(self: *Self, data: anytype) !void { - const bytes = std.mem.toBytes(data); - var iter = std.mem.window(u8, &bytes, Packet.data_size, Packet.data_size); + const T = @TypeOf(data); + const bytes = std.mem.asBytes(&data); + + var remaining: usize = @sizeOf(T) / Packet.data_size; + if (@mod(@sizeOf(T), Packet.data_size) != 0) remaining += 1; + + var offset: usize = 0; var timeout: i64 = 0; while (true) { - const maybe_chunk = iter.next(); - if (maybe_chunk == null and timeout == 0) { + if (remaining == 0 and timeout == 0) { + std.debug.print("sent all packets, starting timeout\n", .{}); // if we ware done sending, start a timeout timeout = std.time.milliTimestamp(); } - if (self.unacked_packet_head - self.unacked_packet_tail <= num_unacked_packets) { + if (remaining > 0 and self.unacked_packet_head - self.unacked_packet_tail < num_unacked_packets) { + remaining -= 1; // only send a chunk if we have room in our unacked packets - if (maybe_chunk) |chunk| { - var packet = Packet.init(); - packet.seq_num = self.seq_num; - packet.data_len = @intCast(chunk.len); - @memcpy(packet.data[0..chunk.len], chunk); - - self.unacked_packets[@mod(self.unacked_packet_head, num_unacked_packets)] = .{ - .packet = packet, - .last_sent = std.time.milliTimestamp(), - }; - self.unacked_packet_head += 1; - - self.seq_num += 1; - try self.sendPacket(&packet); - } + const end = @min(offset + Packet.data_size, bytes.len); + const chunk = bytes[offset..end]; + offset += chunk.len; + + var packet = Packet.init(); + packet.seq_num = self.seq_num; + packet.data_len = @intCast(chunk.len); + @memcpy(packet.data[0..chunk.len], chunk); + + self.unacked_packets[@mod(self.unacked_packet_head, num_unacked_packets)] = .{ + .packet = packet, + .last_sent = std.time.milliTimestamp(), + }; + self.unacked_packet_head += 1; + + self.seq_num += 1; + try self.sendPacket(&packet); } else if (timeout == 0) { + std.debug.print("not receiving acks, starting timeout\n", .{}); // receiver might have disconnected timeout = std.time.milliTimestamp(); } @@ -147,13 +156,14 @@ pub fn Connection(comptime ChannelImpl: type) type { if (unacked.retries >= num_retries) { return error.MaxRetriesExceeded; } + std.debug.print("retransmitting seq_num={}\n", .{unacked.packet.seq_num}); try self.sendPacket(&unacked.packet); unacked.last_sent = current_time; unacked.retries += 1; } } - if (maybe_chunk == null and self.unacked_packet_head == self.unacked_packet_tail) { + if (remaining == 0 and self.unacked_packet_head == self.unacked_packet_tail) { return; } } @@ -165,6 +175,9 @@ pub fn Connection(comptime ChannelImpl: type) type { const result_bytes = std.mem.asBytes(result); var remaining: usize = @sizeOf(T) / Packet.data_size; if (@mod(@sizeOf(T), Packet.data_size) != 0) remaining += 1; + var num_received_bytes: usize = 0; + + std.debug.print("remaining={}\n", .{remaining}); var timeout = std.time.milliTimestamp(); @@ -186,6 +199,7 @@ pub fn Connection(comptime ChannelImpl: type) type { @memcpy(result_bytes[offset .. offset + len], packet.data[0..len]); remaining -= 1; + num_received_bytes += len; timeout = currentTime; } @@ -195,6 +209,10 @@ pub fn Connection(comptime ChannelImpl: type) type { } } + if (num_received_bytes != @sizeOf(T)) { + return error.StopTheCount; + } + return toOwned(result, self.allocator); } @@ -229,7 +247,10 @@ pub fn Connection(comptime ChannelImpl: type) type { self.buffer_size = data.inner.len - remaining_bytes; @memcpy(self.buffer[0..self.buffer_size], data.inner[remaining_bytes..]); - if (!packet.verifyChecksum()) return null; + if (!packet.verifyChecksum()) { + std.debug.print("DROP packet failed checksum seq_num={}\n", .{packet.seq_num}); + return null; + } return packet; } @@ -271,38 +292,38 @@ test "connection" { try std.testing.expectEqualDeep(obj1, obj2.inner.*); } -// test "connection over unreliable channel" { -// std.debug.print("connection over unreliable channel\n", .{}); -// const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, true); -// defer mocks.deinit(); - -// const addr = layer3.Address.from(11); - -// const channel1 = layer3.Channel(mocks.to); -// const obj1 = MyObject{ .hello = 123 }; -// var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); -// defer conn1.deinit(); -// try conn1.send(obj1); - -// const channel2 = layer3.Channel(mocks.from); -// var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); -// defer conn2.deinit(); -// var obj2 = try conn2.recv(MyObject); -// while (obj2 == null) : (obj2 = try conn2.recv(MyObject)) { -// try conn1.handleRetransmissions(); -// } -// const cobj2 = obj2.?; -// defer cobj2.deinit(); -// try std.testing.expectEqualDeep(obj1, cobj2.inner.*); -// } - test "connection over big T" { std.debug.print("connection over big T\n", .{}); const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); defer mocks.deinit(); const addr = layer3.Address.from(11); - const obj1 = initBytes(Packet.data_size * 19 + 1); + const obj1 = initBytes(Packet.data_size * 111 + 1); + + const channel1 = layer3.Channel(mocks.to); + var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); + + const thread = try std.Thread.spawn(.{}, struct { + fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { + conn.send(obj.*) catch @panic("a"); + } + }.run, .{ &conn1, &obj1 }); + defer thread.join(); + + const channel2 = layer3.Channel(mocks.from); + var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); + const obj2 = try conn2.recv(@TypeOf(obj1)); + defer obj2.deinit(); + try std.testing.expectEqualDeep(obj1, obj2.inner.*); +} + +test "connection over unreliable channel" { + std.debug.print("connection over unreliable channel\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, true); + defer mocks.deinit(); + + const addr = layer3.Address.from(11); + const obj1 = initBytes(2048); const channel1 = layer3.Channel(mocks.to); var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); From acf85e160ef0ae0df96a1392c7b97353c945fdce Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 15:30:02 -0500 Subject: [PATCH 07/15] multiple Ts over single connection --- shared/layer4.zig | 70 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index ce6e7ab..e9caaa8 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -13,9 +13,10 @@ const Flags = packed struct { // Simple packet structure with checksum pub const Packet = extern struct { - const data_size = 114; + const data_size = 110; seq_num: u32 align(1), ack_num: u32 align(1), + index: u32 align(1), flags: Flags align(1), checksum: u32 align(1), data_len: u8 align(1), @@ -25,6 +26,7 @@ pub const Packet = extern struct { return .{ .seq_num = 0, .ack_num = 0, + .index = 0, .flags = .{ .is_ack = false }, .checksum = 0, .data = [_]u8{0} ** data_size, @@ -92,11 +94,12 @@ pub fn Connection(comptime ChannelImpl: type) type { if (@mod(@sizeOf(T), Packet.data_size) != 0) remaining += 1; var offset: usize = 0; + var index: u32 = 0; var timeout: i64 = 0; while (true) { if (remaining == 0 and timeout == 0) { - std.debug.print("sent all packets, starting timeout\n", .{}); + std.debug.print("SENDER: sent all packets, starting timeout\n", .{}); // if we ware done sending, start a timeout timeout = std.time.milliTimestamp(); } @@ -109,6 +112,7 @@ pub fn Connection(comptime ChannelImpl: type) type { var packet = Packet.init(); packet.seq_num = self.seq_num; + packet.index = index; packet.data_len = @intCast(chunk.len); @memcpy(packet.data[0..chunk.len], chunk); @@ -119,9 +123,11 @@ pub fn Connection(comptime ChannelImpl: type) type { self.unacked_packet_head += 1; self.seq_num += 1; + index += 1; + std.debug.print("SEND len={}, seq_num={}, index={}\n", .{ packet.data_len, packet.seq_num, packet.index }); try self.sendPacket(&packet); } else if (timeout == 0) { - std.debug.print("not receiving acks, starting timeout\n", .{}); + std.debug.print("SENDER: not receiving acks, starting timeout\n", .{}); // receiver might have disconnected timeout = std.time.milliTimestamp(); } @@ -137,6 +143,7 @@ pub fn Connection(comptime ChannelImpl: type) type { self.unacked_packets[i] = self.unacked_packets[@mod(self.unacked_packet_tail, num_unacked_packets)]; self.unacked_packet_tail += 1; timeout = std.time.milliTimestamp(); + std.debug.print("ACKN ack_num={}\n", .{packet.ack_num}); break; } } @@ -156,7 +163,7 @@ pub fn Connection(comptime ChannelImpl: type) type { if (unacked.retries >= num_retries) { return error.MaxRetriesExceeded; } - std.debug.print("retransmitting seq_num={}\n", .{unacked.packet.seq_num}); + std.debug.print("RETX seq_num={}\n", .{unacked.packet.seq_num}); try self.sendPacket(&unacked.packet); unacked.last_sent = current_time; unacked.retries += 1; @@ -177,7 +184,7 @@ pub fn Connection(comptime ChannelImpl: type) type { if (@mod(@sizeOf(T), Packet.data_size) != 0) remaining += 1; var num_received_bytes: usize = 0; - std.debug.print("remaining={}\n", .{remaining}); + std.debug.print("RECEIVER: remaining={}\n", .{remaining}); var timeout = std.time.milliTimestamp(); @@ -186,12 +193,14 @@ pub fn Connection(comptime ChannelImpl: type) type { if (try self.recvPacket()) |packet| { if (packet.flags.is_ack) return error.UnexpectedPacket; + std.debug.print("RECV len={}, seq_num={}, index={}\n", .{ packet.data_len, packet.seq_num, packet.index }); + var ack_packet = Packet.init(); ack_packet.flags.is_ack = true; ack_packet.ack_num = packet.seq_num; try self.sendPacket(&ack_packet); - const offset = packet.seq_num * Packet.data_size; + const offset = packet.index * Packet.data_size; const len = @min(Packet.data_size, packet.data_len); if (offset + len > @sizeOf(T)) { return error.StopTheCount; @@ -221,11 +230,11 @@ pub fn Connection(comptime ChannelImpl: type) type { const bytes = std.mem.asBytes(packet); try self.channel.send(bytes, self.address); - std.debug.print("Sending packet: seq={}, ack={}, is_ack={}\n", .{ - packet.seq_num, - packet.ack_num, - packet.flags.is_ack, - }); + // std.debug.print("Sending packet: seq={}, ack={}, is_ack={}\n", .{ + // packet.seq_num, + // packet.ack_num, + // packet.flags.is_ack, + // }); } fn recvPacket(self: *Self) !?Packet { @@ -268,7 +277,7 @@ fn initBytes(num_bytes: comptime_int) [num_bytes]u8 { } test "connection" { - std.debug.print("connection\n", .{}); + std.debug.print("\nTEST: connection\n", .{}); const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); defer mocks.deinit(); @@ -293,7 +302,7 @@ test "connection" { } test "connection over big T" { - std.debug.print("connection over big T\n", .{}); + std.debug.print("\nTEST: connection over big T\n", .{}); const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); defer mocks.deinit(); @@ -318,7 +327,7 @@ test "connection over big T" { } test "connection over unreliable channel" { - std.debug.print("connection over unreliable channel\n", .{}); + std.debug.print("\nTEST: connection over unreliable channel\n", .{}); const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, true); defer mocks.deinit(); @@ -342,6 +351,39 @@ test "connection over unreliable channel" { try std.testing.expectEqualDeep(obj1, obj2.inner.*); } +test "connection transmitting multiple Ts" { + std.debug.print("\nTEST: connection transmitting multiple Ts\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const addr = layer3.Address.from(11); + + const channel1 = layer3.Channel(mocks.to); + var conn1 = Connection(@TypeOf(channel1)).init(std.testing.allocator, channel1, addr); + + const obj1 = initBytes(12); + const obj2 = initBytes(123); + + const thread = try std.Thread.spawn(.{}, struct { + fn run(conn: *@TypeOf(conn1), a: *const @TypeOf(obj1), b: *const @TypeOf(obj2)) void { + conn.send(a.*) catch @panic("a"); + conn.send(b.*) catch @panic("b"); + } + }.run, .{ &conn1, &obj1, &obj2 }); + defer thread.join(); + + const channel2 = layer3.Channel(mocks.from); + var conn2 = Connection(@TypeOf(channel2)).init(std.testing.allocator, channel2, addr); + + const obj1r = try conn2.recv(@TypeOf(obj1)); + defer obj1r.deinit(); + try std.testing.expectEqualDeep(obj1, obj1r.inner.*); + + const obj12r = try conn2.recv(@TypeOf(obj2)); + defer obj12r.deinit(); + try std.testing.expectEqualDeep(obj2, obj12r.inner.*); +} + test "packet checksum" { var packet = Packet.init(); packet.calculateChecksum(); From 9cba337961676796b6e8e84680c9c0a95577da46 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 20:57:22 -0500 Subject: [PATCH 08/15] clean up --- shared/layer4.zig | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index e9caaa8..72bb4b8 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -6,6 +6,13 @@ const shared = @import("main.zig"); const Owned = shared.Owned; const toOwned = shared.toOwned; +pub const ConnectionError = error{ + Timeout, + UnexpectedPacket, + MaxRetriesExceeded, + StopTheCount, +} || layer3.ChannelError; + const Flags = packed struct { is_ack: bool, _: u7 = 0, @@ -52,6 +59,7 @@ pub const Packet = extern struct { } }; +/// Reliable connection over an unreliable channel. Can send arbitrarily large data over the channel. pub fn Connection(comptime ChannelImpl: type) type { return struct { const Self = @This(); @@ -86,7 +94,9 @@ pub fn Connection(comptime ChannelImpl: type) type { }; } - pub fn send(self: *Self, data: anytype) !void { + /// Blocks until all data is sent and successfully acknowledged by the receiver. + /// Will return early if there is an error or the connection times out. + pub fn send(self: *Self, data: anytype) ConnectionError!void { const T = @TypeOf(data); const bytes = std.mem.asBytes(&data); @@ -176,7 +186,8 @@ pub fn Connection(comptime ChannelImpl: type) type { } } - pub fn recv(self: *Self, comptime T: type) !Owned(*T) { + /// Blocks until all data is received. Will return early if there is an error or the connection times out. + pub fn recv(self: *Self, comptime T: type) ConnectionError!Owned(*T) { const result = try self.allocator.create(T); errdefer self.allocator.destroy(result); const result_bytes = std.mem.asBytes(result); @@ -289,7 +300,7 @@ test "connection" { const thread = try std.Thread.spawn(.{}, struct { fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { - conn.send(obj.*) catch @panic("a"); + conn.send(obj.*) catch @panic("failed to send"); } }.run, .{ &conn1, &obj1 }); defer thread.join(); @@ -314,7 +325,7 @@ test "connection over big T" { const thread = try std.Thread.spawn(.{}, struct { fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { - conn.send(obj.*) catch @panic("a"); + conn.send(obj.*) catch @panic("failed to send"); } }.run, .{ &conn1, &obj1 }); defer thread.join(); @@ -339,7 +350,7 @@ test "connection over unreliable channel" { const thread = try std.Thread.spawn(.{}, struct { fn run(conn: *@TypeOf(conn1), obj: *const @TypeOf(obj1)) void { - conn.send(obj.*) catch @panic("a"); + conn.send(obj.*) catch @panic("failed to send"); } }.run, .{ &conn1, &obj1 }); defer thread.join(); @@ -366,8 +377,8 @@ test "connection transmitting multiple Ts" { const thread = try std.Thread.spawn(.{}, struct { fn run(conn: *@TypeOf(conn1), a: *const @TypeOf(obj1), b: *const @TypeOf(obj2)) void { - conn.send(a.*) catch @panic("a"); - conn.send(b.*) catch @panic("b"); + conn.send(a.*) catch @panic("failed to send (a)"); + conn.send(b.*) catch @panic("failed to send (b)"); } }.run, .{ &conn1, &obj1, &obj2 }); defer thread.join(); @@ -393,7 +404,7 @@ test "packet checksum" { test "corrupted packet checksum" { var packet = Packet.init(); packet.calculateChecksum(); - packet.checksum += 1; + packet.data[13] += 2; try std.testing.expect(!packet.verifyChecksum()); } From 568bc3e1c39435817698bc12d451d740599fe325 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 22:03:44 -0500 Subject: [PATCH 09/15] non overflowing tail and head --- shared/layer4.zig | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index 72bb4b8..3d01dfe 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -74,8 +74,8 @@ pub fn Connection(comptime ChannelImpl: type) type { seq_num: u32 = 0, unacked_packets: [num_unacked_packets]UnackedPacket = undefined, - unacked_packet_tail: usize = 0, - unacked_packet_head: usize = 0, + unacked_packets_tail_index: usize = 0, + unacked_packets_head_index: usize = 0, buffer: [@sizeOf(Packet)]u8 = undefined, buffer_size: usize = 0, @@ -113,7 +113,7 @@ pub fn Connection(comptime ChannelImpl: type) type { // if we ware done sending, start a timeout timeout = std.time.milliTimestamp(); } - if (remaining > 0 and self.unacked_packet_head - self.unacked_packet_tail < num_unacked_packets) { + if (remaining > 0 and self.unacked_packets_head_index - self.unacked_packets_tail_index < num_unacked_packets) { remaining -= 1; // only send a chunk if we have room in our unacked packets const end = @min(offset + Packet.data_size, bytes.len); @@ -126,11 +126,11 @@ pub fn Connection(comptime ChannelImpl: type) type { packet.data_len = @intCast(chunk.len); @memcpy(packet.data[0..chunk.len], chunk); - self.unacked_packets[@mod(self.unacked_packet_head, num_unacked_packets)] = .{ + self.unacked_packets[@mod(self.unacked_packets_head_index, num_unacked_packets)] = .{ .packet = packet, .last_sent = std.time.milliTimestamp(), }; - self.unacked_packet_head += 1; + self.unacked_packets_head_index += 1; self.seq_num += 1; index += 1; @@ -146,12 +146,19 @@ pub fn Connection(comptime ChannelImpl: type) type { if (!packet.flags.is_ack) return error.UnexpectedPacket; // find the packet to ack - var pos: usize = self.unacked_packet_tail; - while (pos < self.unacked_packet_head) : (pos += 1) { + var pos: usize = self.unacked_packets_tail_index; + while (pos < self.unacked_packets_head_index) : (pos += 1) { const i = @mod(pos, num_unacked_packets); if (self.unacked_packets[i].packet.seq_num == packet.ack_num) { - self.unacked_packets[i] = self.unacked_packets[@mod(self.unacked_packet_tail, num_unacked_packets)]; - self.unacked_packet_tail += 1; + const last_tail = @mod(self.unacked_packets_tail_index, num_unacked_packets); + if (last_tail != i) self.unacked_packets[i] = self.unacked_packets[last_tail]; + + self.unacked_packets_tail_index += 1; + if (self.unacked_packets_tail_index >= num_unacked_packets) { + self.unacked_packets_tail_index -= num_unacked_packets; + self.unacked_packets_head_index -= num_unacked_packets; + } + timeout = std.time.milliTimestamp(); std.debug.print("ACKN ack_num={}\n", .{packet.ack_num}); break; @@ -165,8 +172,8 @@ pub fn Connection(comptime ChannelImpl: type) type { return; } - var pos: usize = self.unacked_packet_tail; - while (pos < self.unacked_packet_head) : (pos += 1) { + var pos: usize = self.unacked_packets_tail_index; + while (pos < self.unacked_packets_head_index) : (pos += 1) { const i = @mod(pos, num_unacked_packets); const unacked = &self.unacked_packets[i]; if (current_time - unacked.last_sent > 1000) { @@ -180,7 +187,7 @@ pub fn Connection(comptime ChannelImpl: type) type { } } - if (remaining == 0 and self.unacked_packet_head == self.unacked_packet_tail) { + if (remaining == 0 and self.unacked_packets_head_index == self.unacked_packets_tail_index) { return; } } From 7d1d57771f0b24ad746fb5c90de21acd113a4151 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 22:51:52 -0500 Subject: [PATCH 10/15] more tests --- shared/layer4.zig | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index 3d01dfe..d85da25 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -64,7 +64,7 @@ pub fn Connection(comptime ChannelImpl: type) type { return struct { const Self = @This(); const num_unacked_packets = 10; - const num_retries = 5; + const num_retries = 3; const global_timeout = 5 * 1000; channel: layer3.ChannelInner(ChannelImpl), @@ -402,6 +402,35 @@ test "connection transmitting multiple Ts" { try std.testing.expectEqualDeep(obj2, obj12r.inner.*); } +test "connection send with no receiver" { + std.debug.print("\nTEST: connection send with no receiver\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const addr = layer3.Address.from(11); + + const channel = layer3.Channel(mocks.to); + var conn = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + + const obj = initBytes(12); + const result = conn.send(obj); + try std.testing.expectError(error.MaxRetriesExceeded, result); +} + +test "connection receive with no sender" { + std.debug.print("\nTEST: connection receive with no sender\n", .{}); + const mocks = try layer3.MockChannel.init(std.testing.allocator, 80, false); + defer mocks.deinit(); + + const addr = layer3.Address.from(11); + + const channel = layer3.Channel(mocks.from); + var conn = Connection(@TypeOf(channel)).init(std.testing.allocator, channel, addr); + + const result = conn.recv(u32); + try std.testing.expectError(error.Timeout, result); +} + test "packet checksum" { var packet = Packet.init(); packet.calculateChecksum(); From 0c17667b5465036f0fb64cbd95be41bcbe997e38 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 23:01:00 -0500 Subject: [PATCH 11/15] remove layer 3 error --- shared/layer3.zig | 4 ---- 1 file changed, 4 deletions(-) diff --git a/shared/layer3.zig b/shared/layer3.zig index b0a0ff2..2102523 100644 --- a/shared/layer3.zig +++ b/shared/layer3.zig @@ -17,7 +17,6 @@ pub const Address = enum(u10) { pub const ChannelError = error{ SendFailed, - // SendTooBig, RecvFailed, } || std.mem.Allocator.Error; @@ -146,9 +145,6 @@ const MockChannelSimplex = struct { self.mutex.lock(); defer self.mutex.unlock(); - // if (data.len > self.recv_buffer_size) { - // return ChannelError.SendTooBig; - // } if (self.sendBuffer.getPtr(to)) |buffer| { try buffer.appendSlice(data); } else { From e2ec5f5599d6c1b35417498ce71605db75264a19 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 23:17:59 -0500 Subject: [PATCH 12/15] stuff --- .vscode/settings.json | 3 ++- shared/layer4.zig | 49 +++++++++++++------------------------------ 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 3426c1c..8436bc0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,5 +2,6 @@ "zig.path": "zig", "zig.zls.path": "zls", "zig.zls.enableBuildOnSave": true, - "zig.zls.enableAutofix": true + "zig.zls.enableAutofix": true, + "zig.zls.buildOnSaveStep": "test" } diff --git a/shared/layer4.zig b/shared/layer4.zig index d85da25..dbd497d 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -21,25 +21,13 @@ const Flags = packed struct { // Simple packet structure with checksum pub const Packet = extern struct { const data_size = 110; - seq_num: u32 align(1), - ack_num: u32 align(1), - index: u32 align(1), - flags: Flags align(1), - checksum: u32 align(1), - data_len: u8 align(1), - data: [data_size]u8 align(1), - - pub fn init() Packet { - return .{ - .seq_num = 0, - .ack_num = 0, - .index = 0, - .flags = .{ .is_ack = false }, - .checksum = 0, - .data = [_]u8{0} ** data_size, - .data_len = 0, - }; - } + seq_num: u32 align(1) = 0, + ack_num: u32 align(1) = 0, + index: u32 align(1) = 0, + flags: Flags align(1) = .{ .is_ack = false }, + checksum: u32 align(1) = 0, + data_len: u8 align(1) = 0, + data: [data_size]u8 align(1) = [_]u8{0} ** data_size, pub fn calculateChecksum(self: *Packet) void { self.checksum = 0; @@ -120,10 +108,11 @@ pub fn Connection(comptime ChannelImpl: type) type { const chunk = bytes[offset..end]; offset += chunk.len; - var packet = Packet.init(); - packet.seq_num = self.seq_num; - packet.index = index; - packet.data_len = @intCast(chunk.len); + var packet = Packet{ + .seq_num = self.seq_num, + .index = index, + .data_len = @intCast(chunk.len), + }; @memcpy(packet.data[0..chunk.len], chunk); self.unacked_packets[@mod(self.unacked_packets_head_index, num_unacked_packets)] = .{ @@ -213,9 +202,7 @@ pub fn Connection(comptime ChannelImpl: type) type { std.debug.print("RECV len={}, seq_num={}, index={}\n", .{ packet.data_len, packet.seq_num, packet.index }); - var ack_packet = Packet.init(); - ack_packet.flags.is_ack = true; - ack_packet.ack_num = packet.seq_num; + var ack_packet = Packet{ .ack_num = packet.seq_num, .flags = .{ .is_ack = true } }; try self.sendPacket(&ack_packet); const offset = packet.index * Packet.data_size; @@ -247,12 +234,6 @@ pub fn Connection(comptime ChannelImpl: type) type { packet.calculateChecksum(); const bytes = std.mem.asBytes(packet); try self.channel.send(bytes, self.address); - - // std.debug.print("Sending packet: seq={}, ack={}, is_ack={}\n", .{ - // packet.seq_num, - // packet.ack_num, - // packet.flags.is_ack, - // }); } fn recvPacket(self: *Self) !?Packet { @@ -432,13 +413,13 @@ test "connection receive with no sender" { } test "packet checksum" { - var packet = Packet.init(); + var packet = Packet{}; packet.calculateChecksum(); try std.testing.expect(packet.verifyChecksum()); } test "corrupted packet checksum" { - var packet = Packet.init(); + var packet = Packet{}; packet.calculateChecksum(); packet.data[13] += 2; try std.testing.expect(!packet.verifyChecksum()); From e348e74bcdd72f0abfe4eac013b9688cc243598d Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Mon, 18 Nov 2024 23:51:15 -0500 Subject: [PATCH 13/15] add comments --- shared/layer4.zig | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/shared/layer4.zig b/shared/layer4.zig index dbd497d..f1668cf 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -14,6 +14,7 @@ pub const ConnectionError = error{ } || layer3.ChannelError; const Flags = packed struct { + /// Whether the packet is an acknowledgment is_ack: bool, _: u7 = 0, }; @@ -21,14 +22,24 @@ const Flags = packed struct { // Simple packet structure with checksum pub const Packet = extern struct { const data_size = 110; + + /// Sequence number of the packet seq_num: u32 align(1) = 0, + + /// Sequence number of the packet being acknowledged ack_num: u32 align(1) = 0, + + /// Index of the data chunk in the message index: u32 align(1) = 0, - flags: Flags align(1) = .{ .is_ack = false }, + + /// CRC 32 checksum of the whole packet checksum: u32 align(1) = 0, + + flags: Flags align(1) = .{ .is_ack = false }, data_len: u8 align(1) = 0, data: [data_size]u8 align(1) = [_]u8{0} ** data_size, + /// Calculate and modify the checksum of the packet pub fn calculateChecksum(self: *Packet) void { self.checksum = 0; const bytes = std.mem.asBytes(self); @@ -37,6 +48,7 @@ pub const Packet = extern struct { self.checksum = hasher.final(); } + /// Check that the checksum of the packet is valid pub fn verifyChecksum(self: *const Packet) bool { var temp_packet = self.*; temp_packet.checksum = 0; @@ -48,10 +60,11 @@ pub const Packet = extern struct { }; /// Reliable connection over an unreliable channel. Can send arbitrarily large data over the channel. +/// `ChannelImpl` must implement the `Channel` interface. pub fn Connection(comptime ChannelImpl: type) type { return struct { const Self = @This(); - const num_unacked_packets = 10; + const num_unacked_packets = 5; const num_retries = 3; const global_timeout = 5 * 1000; @@ -139,9 +152,12 @@ pub fn Connection(comptime ChannelImpl: type) type { while (pos < self.unacked_packets_head_index) : (pos += 1) { const i = @mod(pos, num_unacked_packets); if (self.unacked_packets[i].packet.seq_num == packet.ack_num) { + // swap the acked packet with the last unacked packet, which will be removed const last_tail = @mod(self.unacked_packets_tail_index, num_unacked_packets); if (last_tail != i) self.unacked_packets[i] = self.unacked_packets[last_tail]; + // move the tail up, removing the acked packet from the unacked list. If the tail gets + // too high, move the tail and head down (to avoid overflow when it gets to @sizeOf(usize)) self.unacked_packets_tail_index += 1; if (self.unacked_packets_tail_index >= num_unacked_packets) { self.unacked_packets_tail_index -= num_unacked_packets; @@ -230,12 +246,14 @@ pub fn Connection(comptime ChannelImpl: type) type { return toOwned(result, self.allocator); } + /// Calculate the checksum of the packet and send it over the underlying channel fn sendPacket(self: *Self, packet: *Packet) !void { packet.calculateChecksum(); const bytes = std.mem.asBytes(packet); try self.channel.send(bytes, self.address); } + /// Poll receiving a `Packet` from the underlying channel fn recvPacket(self: *Self) !?Packet { if (try self.channel.recv(self.address)) |data| { std.debug.assert(data.inner.len <= @sizeOf(Packet)); @@ -262,6 +280,7 @@ pub fn Connection(comptime ChannelImpl: type) type { return packet; } + return null; } }; From e52a78417d2ea5a01d89ba6bca69a82c2b253e5b Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Tue, 19 Nov 2024 00:56:41 -0500 Subject: [PATCH 14/15] fix docs --- build.zig | 12 +++++++++--- shared/layer4.zig | 9 ++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/build.zig b/build.zig index ef09fa8..db78de6 100644 --- a/build.zig +++ b/build.zig @@ -146,17 +146,23 @@ pub fn build(b: *std.Build) !void { apStep.dependOn(&b.addInstallArtifact(apExe, .{ .dest_dir = .{ .override = .{ .custom = "../application_processor/c/src" } } }).step); compStep.dependOn(&b.addInstallArtifact(compExe, .{ .dest_dir = .{ .override = .{ .custom = "../component/c/src" } } }).step); - const test_step = b.step("test", "Run unit tests"); const unit_tests = b.addTest(.{ .root_source_file = b.path("shared/main.zig"), .target = b.resolveTargetQuery(.{}), }); const run_unit_tests = b.addRunArtifact(unit_tests); + const test_step = b.step("test", "Run unit tests"); test_step.dependOn(&run_unit_tests.step); + const docs = b.addObject(.{ + .name = "main", + .root_source_file = b.path("shared/main.zig"), + .target = target, + .optimize = .Debug, + }); const install_docs = b.addInstallDirectory(.{ - .source_dir = unit_tests.getEmittedDocs(), - .install_dir = .{ .custom = ".." }, + .source_dir = docs.getEmittedDocs(), + .install_dir = .prefix, .install_subdir = "docs", }); const docs_step = b.step("docs", "Generate documentation"); diff --git a/shared/layer4.zig b/shared/layer4.zig index f1668cf..14db251 100644 --- a/shared/layer4.zig +++ b/shared/layer4.zig @@ -8,8 +8,11 @@ const toOwned = shared.toOwned; pub const ConnectionError = error{ Timeout, - UnexpectedPacket, MaxRetriesExceeded, + UnexpectedPacket, + + /// An extremely significant error has occurred that can only happen if + /// someone has interfered with the data. StopTheCount, } || layer3.ChannelError; @@ -19,7 +22,7 @@ const Flags = packed struct { _: u7 = 0, }; -// Simple packet structure with checksum +/// Simple packet structure with checksum pub const Packet = extern struct { const data_size = 110; @@ -66,7 +69,7 @@ pub fn Connection(comptime ChannelImpl: type) type { const Self = @This(); const num_unacked_packets = 5; const num_retries = 3; - const global_timeout = 5 * 1000; + const global_timeout = 10 * 1000; channel: layer3.ChannelInner(ChannelImpl), address: layer3.Address, From 1582fb8dce6aed6aa37fb781eb15ea35711fb334 Mon Sep 17 00:00:00 2001 From: Mark Bundschuh Date: Tue, 19 Nov 2024 01:29:03 -0500 Subject: [PATCH 15/15] fix docs install dir --- build.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.zig b/build.zig index db78de6..a7a378d 100644 --- a/build.zig +++ b/build.zig @@ -162,7 +162,7 @@ pub fn build(b: *std.Build) !void { }); const install_docs = b.addInstallDirectory(.{ .source_dir = docs.getEmittedDocs(), - .install_dir = .prefix, + .install_dir = .{ .custom = ".." }, .install_subdir = "docs", }); const docs_step = b.step("docs", "Generate documentation");