Skip to content

Commit 6744928

Browse files
committed
update
1 parent d4112dc commit 6744928

File tree

8 files changed

+76
-3
lines changed

8 files changed

+76
-3
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,8 @@ dolphindb_csharpapi/obj/
3434
*.cache
3535
*.cache
3636
dolphindb_csharpapi_test/obj/Release/CoreCompileInputs.cache
37+
dolphindb_csharpapi/bin/Release/dolphindb_csharpapi.instr.pdb
38+
dolphindb_csharpapi/bin/Release/dolphindb_csharpapi.pdb
39+
dolphindb_csharpapi/dolphindb_csharpapi.csproj.user
40+
dolphindb_csharpapi/dolphindb_csharpapi.csproj.user
41+
*.user
Binary file not shown.

dolphindb_csharpapi/dolphindb_csharpapi.csproj

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@
6767
<Compile Include="data\BasicFloatMatrix.cs" />
6868
<Compile Include="data\BasicFloatVector.cs" />
6969
<Compile Include="data\BasicInt.cs" />
70+
<Compile Include="data\BasicInt128.cs" />
71+
<Compile Include="data\BasicInt128Vector.cs" />
7072
<Compile Include="data\BasicIntMatrix.cs" />
7173
<Compile Include="data\BasicIntVector.cs" />
74+
<Compile Include="data\BasicIPAddr.cs" />
75+
<Compile Include="data\BasicIPAddrVector.cs" />
7276
<Compile Include="data\BasicLong.cs" />
7377
<Compile Include="data\BasicLongMatrix.cs" />
7478
<Compile Include="data\BasicLongVector.cs" />
@@ -102,6 +106,8 @@
102106
<Compile Include="data\BasicTimestampMatrix.cs" />
103107
<Compile Include="data\BasicTimestampVector.cs" />
104108
<Compile Include="data\BasicTimeVector.cs" />
109+
<Compile Include="data\BasicUuid.cs" />
110+
<Compile Include="data\BasicUuidVector.cs" />
105111
<Compile Include="data\IDictionary.cs" />
106112
<Compile Include="data\IEntity.cs" />
107113
<Compile Include="data\IEntityFactory.cs" />
@@ -123,6 +129,7 @@
123129
<Compile Include="io\ExtendedDataOutput.cs" />
124130
<Compile Include="io\LittleEndianDataInputStream.cs" />
125131
<Compile Include="io\LittleEndianDataOutputStream.cs" />
132+
<Compile Include="io\Long2.cs" />
126133
<Compile Include="io\ProgressListener.cs" />
127134
<Compile Include="Properties\AssemblyInfo.cs" />
128135
<Compile Include="RSAUtils.cs" />
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<PropertyGroup>
4-
<ProjectView>ProjectFiles</ProjectView>
4+
<ProjectView>ShowAllFiles</ProjectView>
55
</PropertyGroup>
66
</Project>

dolphindb_csharpapi/streaming/AbstractClient.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public abstract class AbstractClient : MessageDispatcher
1313
protected static readonly int DEFAULT_PORT = 8849;
1414
protected static readonly string DEFAULT_HOST = "localhost";
1515
protected static readonly string DEFAULT_ACTION_NAME = "csharpStreamingApi";
16+
protected string listeningHost;
1617
protected int listeningPort;
1718
protected QueueManager queueManager = new QueueManager();
1819
protected Dictionary<string, List<IMessage>> messageCache = new Dictionary<string, List<IMessage>>();
@@ -85,7 +86,9 @@ private void activeCloseConnection(Site site)
8586
conn.connect(site.host, site.port);
8687
try
8788
{
88-
string localIP = conn.LocalAddress;
89+
string localIP = listeningHost;
90+
if (localIP == null || localIP.Equals(String.Empty))
91+
localIP = conn.LocalAddress;
8992
List<IEntity> @params = new List<IEntity>
9093
{
9194
new BasicString(localIP),
@@ -121,6 +124,14 @@ public AbstractClient(int subscribePort)
121124
pThread.Start();
122125
}
123126

127+
public AbstractClient(string subscribeHost, int subscribePort)
128+
{
129+
listeningHost = subscribeHost;
130+
listeningPort = subscribePort;
131+
Deamon deamon = new Deamon(subscribePort, this);
132+
pThread = new Thread(new ThreadStart(deamon.run));
133+
pThread.Start();
134+
}
124135
private void addMessageToCache(IMessage msg)
125136
{
126137
string topic = msg.getTopic();
@@ -201,7 +212,9 @@ protected BlockingCollection<List<IMessage>> subscribeInternal(string host, int
201212
dbConn.connect(host, port);
202213
try
203214
{
204-
string localIP = dbConn.LocalAddress;
215+
string localIP = listeningHost;
216+
if (localIP == null || localIP.Equals(String.Empty))
217+
localIP = dbConn.LocalAddress;
205218

206219
if (!hostEndian.ContainsKey(host))
207220
hostEndian.Add(host, dbConn.RemoteLittleEndian);

dolphindb_csharpapi/streaming/PollingClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class PollingClient : AbstractClient
1414

1515
public PollingClient(int subscribePort) : base(subscribePort) { }
1616

17+
public PollingClient(string subscribeHost, int subscribePort) : base(subscribeHost, subscribePort) { }
18+
1719
override protected void doReconnect(Site site)
1820
{
1921
while (true)

dolphindb_csharpapi/streaming/ThreadPooledClient.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,49 @@ public TaskPool(int threadsMaxCount)
4343

4444
public ThreadPooledClient() : this(DEFAULT_PORT) { }
4545

46+
public ThreadPooledClient(string subscribeHost, int subscribePort) : base(subscribeHost,subscribePort) {
47+
Queue<IMessage> backlog = new Queue<IMessage>();
48+
49+
bool fillBacklog()
50+
{
51+
bool filled = false;
52+
lock (queueHandlers)
53+
{
54+
foreach (QueueHandlerBinder binder in queueHandlers.Values)
55+
{
56+
if (binder.Item1.TryTake(out List<IMessage> messages))
57+
{
58+
messages.ForEach(backlog.Enqueue);
59+
filled = true;
60+
}
61+
}
62+
}
63+
return filled;
64+
}
65+
66+
void run()
67+
{
68+
while (true)
69+
{
70+
while (backlog.Count > 0)
71+
{
72+
IMessage msg = backlog.Dequeue();
73+
QueueHandlerBinder binder;
74+
lock (queueHandlers)
75+
{
76+
binder = queueHandlers[msg.getTopic()];
77+
}
78+
HandlerRunner handlerRunner = new HandlerRunner(binder.Item2, msg);
79+
ThreadPool.QueueUserWorkItem(new WaitCallback(handlerRunner.run));
80+
}
81+
fillBacklog();
82+
}
83+
}
84+
85+
thread = new Thread(new ThreadStart(run));
86+
thread.Start();
87+
}
88+
4689
public ThreadPooledClient(int subscribePort) : base(subscribePort)
4790
{
4891
Queue<IMessage> backlog = new Queue<IMessage>();

dolphindb_csharpapi/streaming/ThreadedClient.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public ThreadedClient() : this(DEFAULT_PORT) { }
1515

1616
public ThreadedClient(int subscribePort) : base(subscribePort) { }
1717

18+
public ThreadedClient(string subscribeHost, int subscribePort) : base(subscribeHost, subscribePort) { }
19+
20+
1821
class HandlerLooper
1922
{
2023
BlockingCollection<List<IMessage>> queue;

0 commit comments

Comments
 (0)