@@ -2325,6 +2325,115 @@ public void test_EventClient_vector_decimal_1() throws IOException, Interrupted
2325
2325
Assert .assertEquals (1 ,bt2 .rows ());
2326
2326
checkData (bt1 ,bt2 );
2327
2327
}
2328
+ public static EventMessageHandler handler_string = new EventMessageHandler () {
2329
+ @ Override
2330
+ public void doEvent (String eventType , List <Entity > attribute ) {
2331
+ System .out .println ("eventType: " + eventType );
2332
+ System .out .println (attribute .toString ());
2333
+ System .out .println (eventType .equals ("event_string" ));
2334
+ try {
2335
+ conn .run ("tableInsert{outputTable}" , attribute );
2336
+ } catch (IOException e ) {
2337
+ throw new RuntimeException (e );
2338
+ }
2339
+ }
2340
+ };
2341
+ @ Test
2342
+ public void test_EventClient_vector_string () throws IOException , InterruptedException {
2343
+ String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n " +
2344
+ "share table(1:0,[\" col1\" ],[STRING]) as outputTable;\n " ;
2345
+ conn .run (script );
2346
+ String script1 ="class event_string{\n " +
2347
+ "\t stringv :: STRING VECTOR\n " +
2348
+ " def event_string(string){\n " +
2349
+ "\t stringv = string\n " +
2350
+ " \t }\n " +
2351
+ "} \n " +
2352
+ "schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n " +
2353
+ "eventType = 'event_string'\n " +
2354
+ "eventKeys = 'stringv';\n " +
2355
+ "typeV = [ STRING];\n " +
2356
+ "formV = [ VECTOR];\n " +
2357
+ "insert into schemaTable values([eventType], [eventKeys], [typeV],[formV]);\n " +
2358
+ "share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n " +
2359
+ "try{\n dropStreamEngine(`serInput)\n }catch(ex){\n }\n " +
2360
+ "inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);" ;
2361
+ conn .run (script1 );
2362
+ EventSchema scheme = new EventSchema ();
2363
+ scheme .setEventType ("event_string" );
2364
+ scheme .setFieldNames (Arrays .asList ("stringv" ));
2365
+ scheme .setFieldTypes (Arrays .asList ( DT_STRING ));
2366
+ scheme .setFieldForms (Arrays .asList ( DF_VECTOR ));
2367
+ scheme .setFieldExtraParams (Arrays .asList ( 2 ));
2368
+
2369
+ List <EventSchema > eventSchemes = Collections .singletonList (scheme );
2370
+ List <String > eventTimeKeys = new ArrayList <>();
2371
+ List <String > commonKeys = new ArrayList <>();
2372
+ EventSender sender = new EventSender (conn , "inputTable" ,eventSchemes , eventTimeKeys , commonKeys );
2373
+ EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
2374
+ client .subscribe (HOST , PORT , "intput1" , "test1" , handler_string , -1 , true , "admin" , "123456" );
2375
+
2376
+ String script2 = "\t event_string1=event_string( [\" 111\" ,\" 222\" ,\" \" ,NULL])\n " +
2377
+ "\t appendEvent(inputSerializer, event_string1)\n " ;
2378
+ conn .run (script2 );
2379
+ List <Entity > attributes = new ArrayList <>();
2380
+ attributes .add (new BasicStringVector (new String []{"111" ,"222" ,"" ,"" }));
2381
+ sender .sendEvent ("event_string" ,attributes );
2382
+ BasicTable bt1 = (BasicTable )conn .run ("select * from inputTable;" );
2383
+ Assert .assertEquals (1 ,bt1 .rows ());
2384
+ Thread .sleep (2000 );
2385
+ BasicTable bt2 = (BasicTable )conn .run ("select * from intput1;" );
2386
+ Assert .assertEquals (1 ,bt2 .rows ());
2387
+ checkData (bt1 ,bt2 );
2388
+ }
2389
+ @ Test
2390
+ public void test_EventClient_vector_symbol () throws IOException , InterruptedException {
2391
+ String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n " +
2392
+ "share table(1:0,[\" col1\" ],[STRING]) as outputTable;\n " ;
2393
+ conn .run (script );
2394
+ String script1 ="class event_symbol{\n " +
2395
+ "\t symbolv :: SYMBOL VECTOR\n " +
2396
+ " def event_symbol(symbol){\n " +
2397
+ "\t symbolv = symbol\n " +
2398
+ " \t }\n " +
2399
+ "} \n " +
2400
+ "schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n " +
2401
+ "eventType = 'event_symbol'\n " +
2402
+ "eventKeys = 'symbolv';\n " +
2403
+ "typeV = [ SYMBOL];\n " +
2404
+ "formV = [ VECTOR];\n " +
2405
+ "insert into schemaTable values([eventType], [eventKeys], [typeV],[formV]);\n " +
2406
+ "share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n " +
2407
+ "try{\n dropStreamEngine(`serInput)\n }catch(ex){\n }\n " +
2408
+ "inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);" ;
2409
+ conn .run (script1 );
2410
+ EventSchema scheme = new EventSchema ();
2411
+ scheme .setEventType ("event_symbol" );
2412
+ scheme .setFieldNames (Arrays .asList ("symbolv" ));
2413
+ scheme .setFieldTypes (Arrays .asList ( DT_SYMBOL ));
2414
+ scheme .setFieldForms (Arrays .asList ( DF_VECTOR ));
2415
+ scheme .setFieldExtraParams (Arrays .asList ( 2 ));
2416
+
2417
+ List <EventSchema > eventSchemes = Collections .singletonList (scheme );
2418
+ List <String > eventTimeKeys = new ArrayList <>();
2419
+ List <String > commonKeys = new ArrayList <>();
2420
+ EventSender sender = new EventSender (conn , "inputTable" ,eventSchemes , eventTimeKeys , commonKeys );
2421
+ EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
2422
+ client .subscribe (HOST , PORT , "intput1" , "test1" , handler_string , -1 , true , "admin" , "123456" );
2423
+
2424
+ String script2 = "\t event_symbol1=event_symbol( symbol([\" 111\" ,\" 222\" ,\" \" ,NULL]))\n " +
2425
+ "\t appendEvent(inputSerializer, event_symbol1)\n " ;
2426
+ conn .run (script2 );
2427
+ List <Entity > attributes = new ArrayList <>();
2428
+ attributes .add (new BasicSymbolVector (Arrays .asList (new String []{"111" , "222" , "" , "" })));
2429
+ sender .sendEvent ("event_symbol" ,attributes );
2430
+ BasicTable bt1 = (BasicTable )conn .run ("select * from inputTable;" );
2431
+ Assert .assertEquals (1 ,bt1 .rows ());
2432
+ Thread .sleep (2000 );
2433
+ BasicTable bt2 = (BasicTable )conn .run ("select * from intput1;" );
2434
+ Assert .assertEquals (1 ,bt2 .rows ());
2435
+ checkData (bt1 ,bt2 );
2436
+ }
2328
2437
@ Test
2329
2438
public void test_EventSender_all_dateType_array () throws IOException {
2330
2439
EventSchema scheme = new EventSchema ();
0 commit comments