@@ -1767,152 +1767,6 @@ public void Test_ThreadPooledClient_subscribe_backupSites() throws IOException,
1767
1767
threadPooledClient .unsubscribe (HOST ,PORT ,"Trades" ,"subTread1" );
1768
1768
}
1769
1769
1770
- @ Test (timeout = 180000 )
1771
- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect () throws IOException , InterruptedException {
1772
- DBConnection controller_conn = new DBConnection ();
1773
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1774
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1775
- controller_conn .run ("sleep(1000)" );
1776
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1777
- "share(st1,`Trades)\t \n "
1778
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1779
- conn .run (script1 );
1780
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1781
- "share(st2, `Receive)\t \n " ;
1782
- conn .run (script2 );
1783
- DBConnection conn1 = new DBConnection ();
1784
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1785
- conn1 .run (script1 );
1786
- conn1 .run (script2 );
1787
-
1788
- Vector filter1 = (Vector ) conn .run ("1..50000" );
1789
- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +PORT ));
1790
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,true );
1791
- System .out .println ("Successful subscribe" );
1792
- conn .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
1793
- conn1 .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
1794
- Thread .sleep (1000 );
1795
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1796
- Thread .sleep (8000 );
1797
- conn .run ("t=table(5001..5500 as tag,now()+5001..5500 as ts,rand(100.0,500) as data);" + "Trades.append!(t)" );
1798
- Thread .sleep (1000 );
1799
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1800
- Thread .sleep (5000 );
1801
- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1802
- System .out .println (row_num .getColumn (0 ).get (0 ));
1803
- assertEquals ("5500" ,row_num .getColumn (0 ).get (0 ).getString ());
1804
- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
1805
- }
1806
-
1807
- @ Test (timeout = 180000 )
1808
- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_backupSites_disconnect_subOnce_false () throws IOException , InterruptedException {
1809
- DBConnection controller_conn = new DBConnection ();
1810
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1811
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1812
- controller_conn .run ("sleep(1000)" );
1813
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1814
- "share(st1,`Trades)\t \n "
1815
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1816
- conn .run (script1 );
1817
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1818
- "share(st2, `Receive)\t \n " ;
1819
- conn .run (script2 );
1820
- DBConnection conn1 = new DBConnection ();
1821
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1822
- conn1 .run (script1 );
1823
- conn1 .run (script2 );
1824
-
1825
- DBConnection conn2 = new DBConnection ();
1826
- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1827
- conn2 .run (script1 );
1828
- conn2 .run (script2 );
1829
- Vector filter1 = (Vector ) conn .run ("1..100000" );
1830
- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1831
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,false );
1832
- System .out .println ("Successful subscribe" );
1833
- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1834
- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1835
- Thread .sleep (1000 );
1836
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1837
- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
1838
- Thread .sleep (8000 );
1839
- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
1840
- Thread .sleep (1000 );
1841
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1842
- Thread .sleep (5000 );
1843
- DBConnection conn3 = new DBConnection ();
1844
- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1845
- conn3 .run (script1 );
1846
- conn3 .run (script2 );
1847
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1848
- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
1849
- Thread .sleep (8000 );
1850
- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1851
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1852
- Thread .sleep (8000 );
1853
-
1854
- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1855
- System .out .println (row_num .getColumn (0 ).get (0 ));
1856
- assertEquals ("3000" ,row_num .getColumn (0 ).get (0 ).getString ());
1857
- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
1858
- }
1859
-
1860
- @ Test (timeout = 180000 )
1861
- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_backupSites_disconnect_subOnce_true () throws IOException , InterruptedException {
1862
- DBConnection controller_conn = new DBConnection ();
1863
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1864
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1865
- controller_conn .run ("sleep(1000)" );
1866
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1867
- "share(st1,`Trades)\t \n "
1868
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1869
- conn .run (script1 );
1870
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1871
- "share(st2, `Receive)\t \n " ;
1872
- conn .run (script2 );
1873
- DBConnection conn1 = new DBConnection ();
1874
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1875
- conn1 .run (script1 );
1876
- conn1 .run (script2 );
1877
-
1878
- DBConnection conn2 = new DBConnection ();
1879
- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1880
- conn2 .run (script1 );
1881
- conn2 .run (script2 );
1882
- Vector filter1 = (Vector ) conn .run ("1..100000" );
1883
- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1884
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,true );
1885
- System .out .println ("Successful subscribe" );
1886
- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1887
- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1888
- Thread .sleep (1000 );
1889
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1890
- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
1891
- Thread .sleep (8000 );
1892
- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
1893
- Thread .sleep (1000 );
1894
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1895
- Thread .sleep (5000 );
1896
- DBConnection conn3 = new DBConnection ();
1897
- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1898
- conn3 .run (script1 );
1899
- conn3 .run (script2 );
1900
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1901
- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
1902
- Thread .sleep (10000 );
1903
- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1904
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1905
- Thread .sleep (5000 );
1906
-
1907
- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1908
- System .out .println (row_num .getColumn (0 ).get (0 ));
1909
- assertEquals ("2000" ,row_num .getColumn (0 ).get (0 ).getString ());
1910
- DBConnection conn4 = new DBConnection ();
1911
- conn4 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1912
- conn4 .run (script1 );
1913
- conn4 .run (script2 );
1914
- //client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
1915
- }
1916
1770
1917
1771
@ Test (timeout = 180000 )
1918
1772
public void Test_ThreadPooledClient_subscribe_backupSites_unsubscribe () throws IOException , InterruptedException {
@@ -1954,181 +1808,7 @@ public void Test_ThreadPooledClient_subscribe_resubscribeInterval_not_true() thr
1954
1808
assertEquals ("1000" ,row_num .getColumn (0 ).get (0 ).getString ());
1955
1809
threadPooledClient .unsubscribe (HOST ,PORT ,"Trades" ,"subTread1" );
1956
1810
}
1957
- public static MessageHandler MessageHandler_handler1 = new MessageHandler () {
1958
- @ Override
1959
- public void doEvent (IMessage msg ) {
1960
- try {
1961
- String script = String .format ("insert into Receive values(%d,%s,%f,%d)" , Integer .parseInt (msg .getEntity (0 ).getString ()), msg .getEntity (1 ).getString (), Double .valueOf (msg .getEntity (2 ).toString ()),System .currentTimeMillis ());
1962
- conn .run (script );
1963
- System .out .println (msg .getEntity (0 ).getString ());
1964
- } catch (IOException e ) {
1965
- e .printStackTrace ();
1966
- }
1967
- }
1968
- };
1969
-
1970
- @ Test (timeout = 180000 )
1971
- public void Test_ThreadPooledClient_subscribe_backupSites_resubscribeInterval () throws Exception {
1972
- DBConnection controller_conn = new DBConnection ();
1973
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1974
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1975
- controller_conn .run ("sleep(1000)" );
1976
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1977
- "share(st1,`Trades)\t \n "
1978
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1979
- conn .run (script1 );
1980
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data`now,[INT,TIMESTAMP,DOUBLE,TIMESTAMP])\n " +
1981
- "share(st2, `Receive)\t \n " ;
1982
- conn .run (script2 );
1983
- DBConnection conn1 = new DBConnection ();
1984
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1985
- conn1 .run (script1 );
1986
- conn1 .run (script2 );
1987
- DBConnection conn3 = new DBConnection ();
1988
- conn3 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1989
- conn3 .run (script1 );
1990
- conn3 .run (script2 );
1991
- Vector filter1 = (Vector ) conn .run ("1..10000000" );
1992
- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1993
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler1 , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,1000 ,true );
1994
- System .out .println ("Successful subscribe" );
1995
- class MyThread extends Thread {
1996
- @ Override
1997
- public void run () {
1998
- try {
1999
- conn3 .run ("for(n in 1..1000){\n " +
2000
- " insert into Trades values(n,now()+n,n);\n " +
2001
- " sleep(100);\n " +
2002
- "}" );
2003
- } catch (Exception e ) {
2004
- // 捕获异常并打印错误信息
2005
- System .err .println ( e .getMessage ());
2006
- }
2007
- }
2008
- }
2009
- class MyThread1 extends Thread {
2010
- @ Override
2011
- public void run () {
2012
- try {
2013
- conn1 .run ("for(n in 1..1000){\n " +
2014
- " insert into Trades values(n,now()+n,n);\n " +
2015
- " sleep(100);\n " +
2016
- "}" );
2017
- } catch (Exception e ) {
2018
- // 捕获异常并打印错误信息
2019
- System .err .println ( e .getMessage ());
2020
- }
2021
- }
2022
- }
2023
- class MyThread2 extends Thread {
2024
- @ Override
2025
- public void run () {
2026
- try {
2027
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2028
- } catch (Exception e ) {
2029
- // 捕获异常并打印错误信息
2030
- System .err .println (e .getMessage ());
2031
- }
2032
- }
2033
- }
2034
- MyThread thread = new MyThread ();
2035
- MyThread1 thread1 = new MyThread1 ();
2036
- MyThread2 thread2 = new MyThread2 ();
2037
- thread .start ();
2038
- thread1 .start ();
2039
- Thread .sleep (2000 );
2040
- thread2 .start ();
2041
- thread .join ();
2042
- Thread .sleep (5000 );
2043
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2044
- Thread .sleep (1000 );
2045
- BasicTable re = (BasicTable )conn .run ("select tag ,now,deltas(now) from Receive order by deltas(now) desc \n " );
2046
- System .out .println (re .getString ());
2047
- Assert .assertEquals (1000 ,re .rows ());
2048
- Assert .assertEquals (true ,Integer .valueOf (re .getColumn (2 ).get (0 ).toString ())>1000 );
2049
- DBConnection conn2 = new DBConnection ();
2050
- conn2 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2051
- conn2 .run (script1 );
2052
- conn2 .run (script2 );
2053
- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
2054
- }
2055
1811
2056
- @ Test (timeout = 180000 )
2057
- public void Test_ThreadPooledClient_subscribe_resubscribeInterval_subOnce_not_set () throws IOException , InterruptedException {
2058
- DBConnection controller_conn = new DBConnection ();
2059
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
2060
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2061
- controller_conn .run ("sleep(1000)" );
2062
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2063
- "share(st1,`Trades)\t \n "
2064
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2065
- conn .run (script1 );
2066
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2067
- "share(st2, `Receive)\t \n " ;
2068
- conn .run (script2 );
2069
- DBConnection conn1 = new DBConnection ();
2070
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2071
- conn1 .run (script1 );
2072
- conn1 .run (script2 );
2073
-
2074
- DBConnection conn2 = new DBConnection ();
2075
- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
2076
- conn2 .run (script1 );
2077
- conn2 .run (script2 );
2078
- Vector filter1 = (Vector ) conn .run ("1..100000" );
2079
- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
2080
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites );
2081
- System .out .println ("Successful subscribe" );
2082
- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2083
- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2084
- Thread .sleep (1000 );
2085
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2086
- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
2087
- Thread .sleep (8000 );
2088
- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
2089
- Thread .sleep (1000 );
2090
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2091
- Thread .sleep (5000 );
2092
- DBConnection conn3 = new DBConnection ();
2093
- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2094
- conn3 .run (script1 );
2095
- conn3 .run (script2 );
2096
- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
2097
- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
2098
- Thread .sleep (8000 );
2099
- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2100
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
2101
- Thread .sleep (5000 );
2102
-
2103
- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
2104
- System .out .println (row_num .getColumn (0 ).get (0 ));
2105
- assertEquals ("3000" ,row_num .getColumn (0 ).get (0 ).getString ());
2106
- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
2107
- }
2108
- //@Test(timeout = 180000)
2109
- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_1 () throws IOException , InterruptedException {
2110
- DBConnection controller_conn = new DBConnection ();
2111
- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
2112
- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2113
- controller_conn .run ("sleep(1000)" );
2114
- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2115
- "share(st1,`Trades)\t \n "
2116
- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2117
- conn .run (script1 );
2118
- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2119
- "share(st2, `Receive)\t \n " ;
2120
- conn .run (script2 );
2121
- DBConnection conn1 = new DBConnection ();
2122
- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2123
- conn1 .run (script1 );
2124
- conn1 .run (script2 );
2125
-
2126
- Vector filter1 = (Vector ) conn .run ("1..50000" );
2127
- List <String > backupSites = Arrays .asList (new String []{"192.168.0.69:18921" , "192.168.0.69:18922" , "192.168.0.69:18923" });
2128
- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites );
2129
- System .out .println ("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old" );
2130
- Thread .sleep (1000000 );
2131
- }
2132
1812
public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler () {
2133
1813
@ Override
2134
1814
public void doEvent (IMessage msg ) {
0 commit comments