Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ConnectX.Client/Network/ZeroTier/Tcp/ZtTcpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected override async Task FillReceivePipeAsync(PipeWriter writer, Cancellati
{
if (!Socket.Poll(10000, SelectMode.SelectRead))
{
await Task.Delay(10, token);
await Task.Delay(1, token);
continue;
}

Expand Down
3 changes: 3 additions & 0 deletions ConnectX.Client/Proxy/GenericProxyAcceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public Task StartAcceptAsync()
while (!_cancellationToken.IsCancellationRequested)
{
var tmp = await _acceptSocket.AcceptAsync(_cancellationToken);

tmp.NoDelay = true;
tmp.LingerState = new LingerOption(true, 3);

if (tmp.RemoteEndPoint is not IPEndPoint remoteEndPoint) continue;

Expand Down
212 changes: 131 additions & 81 deletions ConnectX.Client/Proxy/GenericProxyBase.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
using System.Buffers;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading.Channels;
using ConnectX.Client.Messages.Proxy;
using Microsoft.Extensions.Logging;

namespace ConnectX.Client.Proxy;

public abstract class GenericProxyBase : IDisposable
{
private const int DefaultReceiveBufferSize = 8192;
private const int DefaultReceiveBufferSize = 20480;
private const int RetryInterval = 500;
private const int TryTime = 20;

private readonly CancellationTokenSource _combinedTokenSource;
private readonly CancellationTokenSource _internalTokenSource;

protected readonly CancellationToken CancellationToken;
protected readonly ConcurrentQueue<ForwardPacketCarrier> InwardBuffersQueue = [];
protected readonly ConcurrentQueue<ForwardPacketCarrier> OutwardBuffersQueue = [];

protected Channel<ForwardPacketCarrier>? InwardBuffersQueue = Channel.CreateUnbounded<ForwardPacketCarrier>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
});

protected Channel<ForwardPacketCarrier>? OutwardBuffersQueue = Channel.CreateUnbounded<ForwardPacketCarrier>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});

public readonly List<Func<ForwardPacketCarrier, bool>> OutwardSenders = [];
public readonly TunnelIdentifier TunnelIdentifier;
Expand All @@ -41,14 +51,35 @@ protected GenericProxyBase(
private ushort LocalServerPort => TunnelIdentifier.LocalRealPort;
private ushort RemoteClientPort => TunnelIdentifier.RemoteRealPort;

private void ResetChannels()
{
InwardBuffersQueue?.Writer.Complete();
InwardBuffersQueue = Channel.CreateUnbounded<ForwardPacketCarrier>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
});

OutwardBuffersQueue?.Writer.Complete();
OutwardBuffersQueue = Channel.CreateUnbounded<ForwardPacketCarrier>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
}

public void Dispose()
{
_internalTokenSource.Cancel();
_combinedTokenSource.Dispose();
_internalTokenSource.Dispose();

InwardBuffersQueue.Clear();
OutwardBuffersQueue.Clear();
InwardBuffersQueue?.Writer.Complete();
OutwardBuffersQueue?.Writer.Complete();

InwardBuffersQueue = null;
OutwardBuffersQueue = null;

OutwardSenders.Clear();

_innerSocket?.Shutdown(SocketShutdown.Both);
Expand Down Expand Up @@ -76,49 +107,56 @@ public virtual void Start()

protected async Task OuterSendLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
if (!OutwardBuffersQueue.TryDequeue(out var packetCarrier))
{
await Task.Delay(1, cancellationToken);
continue;
}

if (Environment.TickCount - packetCarrier.LastTryTime < RetryInterval)
{
OutwardBuffersQueue.Enqueue(packetCarrier);
continue;
}
ArgumentNullException.ThrowIfNull(OutwardBuffersQueue);

var sent = false;
var reader = OutwardBuffersQueue.Reader;
var writer = OutwardBuffersQueue.Writer;

packetCarrier.LastTryTime = Environment.TickCount;

foreach (var sender in OutwardSenders)
{
if (!sender(packetCarrier)) continue;
sent = true;
break;
}

if (sent)
{
packetCarrier.Dispose();
continue;
}

// If all return false, it means that it has not been sent.
// If buffer.TryCount greater tha const value TryTime, drop it.
packetCarrier.TryCount++;

if (packetCarrier.TryCount > TryTime)
while (!cancellationToken.IsCancellationRequested)
{
while (await reader.WaitToReadAsync(cancellationToken))
{
packetCarrier.Dispose();
continue;
while (reader.TryRead(out var packetCarrier))
{
if (Environment.TickCount - packetCarrier.LastTryTime < RetryInterval)
{
await writer.WriteAsync(packetCarrier, cancellationToken);
continue;
}

var sent = false;

packetCarrier.LastTryTime = Environment.TickCount;

foreach (var sender in OutwardSenders)
{
if (!sender(packetCarrier)) continue;
sent = true;
break;
}

if (sent)
{
packetCarrier.Dispose();
continue;
}

// If all return false, it means that it has not been sent.
// If buffer.TryCount greater tha const value TryTime, drop it.
packetCarrier.TryCount++;

if (packetCarrier.TryCount > TryTime)
{
packetCarrier.Dispose();
continue;
}

// Re-enqueue.
await writer.WriteAsync(packetCarrier, cancellationToken);
}
}

// Re-enqueue.
OutwardBuffersQueue.Enqueue(packetCarrier);
break;
}
}

Expand All @@ -130,7 +168,8 @@ public void OnReceiveMcPacketCarrier(ForwardPacketCarrier message)
{
Logger.LogReceivedPacket(GetProxyInfoForLog(), message.Payload.Length, RemoteClientPort);

InwardBuffersQueue.Enqueue(message);
ArgumentNullException.ThrowIfNull(InwardBuffersQueue);
ArgumentOutOfRangeException.ThrowIfEqual(InwardBuffersQueue.Writer.TryWrite(message), false);
}

protected virtual object GetProxyInfoForLog()
Expand All @@ -144,40 +183,44 @@ protected virtual object GetProxyInfoForLog()

protected virtual async Task InnerSendLoopAsync(CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(InwardBuffersQueue);

var reader = InwardBuffersQueue.Reader;

while (!cancellationToken.IsCancellationRequested)
{
if (!CheckSocketValid()) continue;
if (!InwardBuffersQueue.TryDequeue(out var packet))
{
await Task.Delay(1, cancellationToken);
continue;
}

try
while (await reader.WaitToReadAsync(cancellationToken))
{
Logger.LogCurrentlyRemainPacket(GetProxyInfoForLog(), InwardBuffersQueue.Count);

var totalLen = packet.Payload.Length;
var sentLen = 0;
var buffer = packet.Payload;

while (sentLen < totalLen)
sentLen += await _innerSocket!.SendAsync(
buffer[sentLen..],
SocketFlags.None,
CancellationToken);

packet.Dispose();

Logger.LogSentPacket(GetProxyInfoForLog(), totalLen, LocalServerPort);
}
catch (SocketException ex)
{
Logger.LogFailedToSendPacket(ex, GetProxyInfoForLog(), LocalServerPort);
}
catch (ObjectDisposedException ex)
{
Logger.LogFailedToSendPacket(ex, GetProxyInfoForLog(), LocalServerPort);
while (reader.TryRead(out var packetCarrier))
{
try
{
var totalLen = packetCarrier.Payload.Length;
var sentLen = 0;
var buffer = packetCarrier.Payload;

while (sentLen < totalLen)
sentLen += await _innerSocket!.SendAsync(
buffer[sentLen..],
SocketFlags.None,
CancellationToken);

packetCarrier.Dispose();

Logger.LogSentPacket(GetProxyInfoForLog(), totalLen, LocalServerPort);
}
catch (SocketException ex)
{
Logger.LogFailedToSendPacket(ex, GetProxyInfoForLog(), LocalServerPort);
}
catch (ObjectDisposedException ex)
{
_innerSocket = null;
Logger.LogFailedToSendPacket(ex, GetProxyInfoForLog(), LocalServerPort);
}
}
}
}
}
Expand All @@ -194,14 +237,14 @@ private bool CheckSocketValid()
}
catch (SocketException e) //无法初始化,清除队列
{
InwardBuffersQueue.Clear();
ResetChannels();
Logger.LogFailedToInitConnectionSocket(e, GetProxyInfoForLog(), e.SocketErrorCode);

return false;
}
catch (ObjectDisposedException)
{
InwardBuffersQueue.Clear();
ResetChannels();
return false;
}
}
Expand All @@ -210,7 +253,10 @@ private bool CheckSocketValid()
private void InitConnectionSocket()
{
if (_innerSocket is { Connected: true })
{
_innerSocket.Shutdown(SocketShutdown.Both);
_innerSocket.Close();
}

_innerSocket?.Dispose();
_innerSocket = CreateSocket();
Expand All @@ -220,12 +266,16 @@ private void InitConnectionSocket()

protected virtual async Task InnerReceiveLoopAsync(CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(OutwardBuffersQueue);

var writer = OutwardBuffersQueue.Writer;

while (!cancellationToken.IsCancellationRequested)
{
if (_innerSocket is not { Connected: true })
{
if (!CheckSocketValid()) continue;
await Task.Delay(1, cancellationToken);
await Task.Yield();
continue;
}

Expand All @@ -240,8 +290,11 @@ protected virtual async Task InnerReceiveLoopAsync(CancellationToken cancellatio
Logger.LogServerDisconnected(GetProxyInfoForLog(), LocalServerPort);

bufferOwner.Dispose();

_innerSocket?.Shutdown(SocketShutdown.Both);
_innerSocket?.Dispose();
_innerSocket = null;

InvokeRealServerDisconnected();
break;
}
Expand All @@ -258,7 +311,7 @@ protected virtual async Task InnerReceiveLoopAsync(CancellationToken cancellatio
TargetRealPort = RemoteClientPort
};

OutwardBuffersQueue.Enqueue(carrier);
await writer.WriteAsync(carrier, cancellationToken);
}
}

Expand All @@ -285,9 +338,6 @@ internal static partial class GenericProxyBaseLoggers
public static partial void LogReceivedPacket(this ILogger logger, object proxyInfo, int length,
ushort remoteClientPort);

[LoggerMessage(LogLevel.Trace, "[{ProxyInfo}] Currently remain {PacketLength} packet")]
public static partial void LogCurrentlyRemainPacket(this ILogger logger, object proxyInfo, int packetLength);

[LoggerMessage(LogLevel.Trace, "[{ProxyInfo}] Sent {PacketLength} bytes to {LocalRealMcPort}")]
public static partial void LogSentPacket(this ILogger logger, object proxyInfo, int packetLength,
ushort localRealMcPort);
Expand Down
6 changes: 4 additions & 2 deletions ConnectX.Client/Proxy/GenericProxyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ protected override Socket CreateSocket()
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

socket.Connect(new IPEndPoint(IPAddress.Loopback,
LocalServerPort));
socket.Connect(new IPEndPoint(IPAddress.Loopback, LocalServerPort));

socket.NoDelay = true;
socket.LingerState = new LingerOption(true, 3);

Logger.LogConnectedToMc(LocalServerPort, GetProxyInfoForLog());

Expand Down
2 changes: 1 addition & 1 deletion ConnectX.ClientConsole/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@
},
"Server": {
"ListenPort": 3535,
"ListenAddress": "127.0.0.1"
"ListenAddress": "192.168.8.74"
}
}
2 changes: 2 additions & 0 deletions ConnectX.Shared/Messages/Relay/Datagram/RelayDatagram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ public readonly partial struct RelayDatagram(Guid from, Guid to, ReadOnlyMemory<
{
public readonly Guid From = from;
public readonly Guid To = to;

[BrotliFormatter<ReadOnlyMemory<byte>>]
public readonly ReadOnlyMemory<byte> Payload = payload;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ namespace ConnectX.Shared.Messages.Relay.Datagram;
public readonly partial struct UnwrappedRelayDatagram(Guid from, ReadOnlyMemory<byte> payload)
{
public readonly Guid From = from;

[BrotliFormatter<ReadOnlyMemory<byte>>]
public readonly ReadOnlyMemory<byte> Payload = payload;
}
Loading