Skip to content

Commit 624439f

Browse files
authored
Fix/temporaljoin (#1241)
* add test case for temporal join against kafka * reduce time delay * update test runner to add a configurable delay. Update temporal join test to use it. * set config parameters before compilation so they become part of the compiled plan. * update documentation
1 parent ef3bb9b commit 624439f

File tree

23 files changed

+557
-134
lines changed

23 files changed

+557
-134
lines changed

documentation/docs/configuration.md

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ Connectors that connect flink to other engines and external systems can be confi
3737

3838
Environment variables that start with the `sqrl` prefix are templated variables that the DataSQRL compiler instantiates. For example: `${sqrl:table-name}` provides the table name for a connector that writes to a table.
3939

40+
Flink configuration can be specified in the `config` section. See [Flink configuration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/config/) for details.
41+
These configuration options are applied when compiling the Flink plan and when executing Flink while running or testing via the [DataSQRL command](compiler).
42+
43+
4044
```json
4145
{
4246
"engines" : {
@@ -60,25 +64,15 @@ Environment variables that start with the `sqrl` prefix are templated variables
6064
"properties.auto.offset.reset" : "earliest",
6165
"topic" : "${sqrl:original-table-name}"
6266
}
67+
},
68+
"config" : {
69+
"table.exec.source.idle-timeout": "5000 ms"
6370
}
6471
}
6572
}
6673
}
6774
```
6875

69-
Flink runtime configuration can be specified in the [`values` configuration](#values) section:
70-
71-
```json
72-
{
73-
"values" : {
74-
"flink-config": {
75-
"table.exec.source.idle-timeout": "100 ms"
76-
}
77-
}
78-
}
79-
```
80-
The configuration options are set on the Flink runtime when running or testing via the [DataSQRL command](compiler).
81-
8276
### Postgres
8377

8478
Postgres is the default database engine.
@@ -256,36 +250,29 @@ Testing related configuration is found in the `test-runner` section.
256250
```json
257251
{
258252
"test-runner": {
259-
"delay-sec": 30
253+
"delay-sec": 30,
254+
"mutation-delay": 1
260255
}
261256
}
262257
```
263258

264-
* `delay-sec`: The number of seconds to wait between starting the processing of data and snapshotting the data.
265-
259+
* `delay-sec`: The number of seconds to wait between starting the processing of data and snapshotting the data. Defaults to 30.
260+
* `required-checkpoints`: The number of checkpoints that need to be completed before the test queries are executed. Defaults to 0. If this is configured to a positive value, `delay-sec` must be set to -1.
261+
* `mutation-delay`: The number of seconds to wait between execution mutation queries. Defaults to 0.
266262

267263

268264
## Values
269265

270-
The `values` section of the configuration allows you to specify configuration values that are passed through to engines they pertain to.
271-
272-
The default deployment profiles supports a `flink-config` section to allow injecting additional flink runtime configuration. You can use this section of the configuration to specify any [Flink configuration option](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/config/).
266+
The `values` section of the configuration allows you to specify configuration values that is applied at runtime when running or testing SQRL scripts with the [DataSQRL command](compiler).
273267

274268
```json
275269
{
276270
"values" : {
277-
"flink-config" : {
278-
"taskmanager.memory.network.max": "800m",
279-
"execution.checkpointing.mode" : "EXACTLY_ONCE",
280-
"execution.checkpointing.interval" : "1000ms"
281-
},
282271
"create-topics": ["mytopic"]
283272
}
284273
}
285274
```
286275

287-
For Flink, the `values` configuration settings take precedence over identical configuration settings in the compiled physical plans.
288-
289276
For the log engine, the `create-topics` option allows you to specify topics to create in the cluster prior to starting the pipeline. This is useful for testing.
290277

291278
## Environmental Variables

sqrl-planner/src/main/java/com/datasqrl/config/PackageJson.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ public interface PackageJson {
3939

4040
ScriptConfig getScriptConfig();
4141

42-
Map<String, Object> toMap();
43-
4442
CompilerConfig getCompilerConfig();
4543

4644
PackageConfiguration getPackageConfig();
@@ -117,6 +115,8 @@ interface EngineConfig {
117115
String getSetting(String key, Optional<String> defaultValue);
118116

119117
ConnectorsConfig getConnectors();
118+
119+
Map<String, Object> getConfig();
120120
}
121121

122122
@AllArgsConstructor
@@ -136,6 +136,11 @@ public String getSetting(String key, Optional<String> defaultValue) {
136136
public ConnectorsConfig getConnectors() {
137137
return null;
138138
}
139+
140+
@Override
141+
public Map<String, Object> getConfig() {
142+
return Map.of();
143+
}
139144
}
140145

141146
interface DependenciesConfig {

sqrl-planner/src/main/java/com/datasqrl/engine/stream/flink/FlinkStreamEngine.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import com.google.inject.Inject;
2929
import java.io.IOException;
3030
import java.util.EnumSet;
31+
import java.util.HashMap;
3132
import java.util.Locale;
33+
import java.util.Map;
3234
import java.util.Optional;
3335
import lombok.Getter;
3436
import lombok.extern.slf4j.Slf4j;
@@ -40,12 +42,12 @@ public class FlinkStreamEngine extends ExecutionEngine.Base implements StreamEng
4042

4143
public static final EnumSet<EngineFeature> FLINK_CAPABILITIES = STANDARD_STREAM;
4244

43-
@Getter private final EngineConfig config;
45+
@Getter private final EngineConfig engineConfig;
4446

4547
@Inject
4648
public FlinkStreamEngine(PackageJson json) {
4749
super(FlinkEngineFactory.ENGINE_NAME, EngineType.STREAMS, FLINK_CAPABILITIES);
48-
this.config =
50+
this.engineConfig =
4951
json.getEngines()
5052
.getEngineConfig(FlinkEngineFactory.ENGINE_NAME)
5153
.orElseGet(() -> new EmptyEngineConfig(FlinkEngineFactory.ENGINE_NAME));
@@ -57,8 +59,14 @@ public void close() throws IOException {}
5759
@Override
5860
public ExecutionMode getExecutionMode() {
5961
return ExecutionMode.valueOf(
60-
config
62+
engineConfig
6163
.getSetting(MODE_KEY, Optional.of(ExecutionMode.STREAMING.name()))
6264
.toUpperCase(Locale.ENGLISH));
6365
}
66+
67+
public Map<String, String> getBaseConfiguration() {
68+
Map<String, String> configMap = new HashMap<>();
69+
engineConfig.getConfig().forEach((key, value) -> configMap.put(key, String.valueOf(value)));
70+
return configMap;
71+
}
6472
}

sqrl-planner/src/main/java/com/datasqrl/planner/Sqrl2FlinkSQLTranslator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.nio.file.Files;
5959
import java.util.ArrayList;
6060
import java.util.Collections;
61-
import java.util.HashMap;
6261
import java.util.List;
6362
import java.util.Map;
6463
import java.util.Optional;
@@ -182,7 +181,7 @@ public Sqrl2FlinkSQLTranslator(
182181
// Create a UDF class loader and configure
183182
ClassLoader udfClassLoader =
184183
new URLClassLoader(jarUrls.toArray(new URL[0]), getClass().getClassLoader());
185-
Map<String, String> config = new HashMap<>();
184+
Map<String, String> config = flink.getBaseConfiguration();
186185
config.put(
187186
"pipeline.classpaths",
188187
jarUrls.stream().map(URL::toString).collect(Collectors.joining(",")));
@@ -215,7 +214,7 @@ public Sqrl2FlinkSQLTranslator(
215214
var calciteConfigBuilder = new CalciteConfigBuilder();
216215
calciteConfigBuilder.addSqlOperatorTable(sqrlFunctionCatalog.getOperatorTable());
217216
// setOptimizerRules(calciteConfigBuilder,tEnv.getConfig()); TODO: fix, so we have more
218-
// effective subgraph identification
217+
// effective subgraph identification by not pushing down projections
219218

220219
this.tEnv.getConfig().setPlannerConfig(calciteConfigBuilder.build());
221220
this.catalogManager = tEnv.getCatalogManager();

sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ class ScriptCriteria {
9696
new ScriptCriteria("postgres-log-disabled.sqrl", "test"),
9797
new ScriptCriteria("postgres-log-disabled.sqrl", "run"),
9898
new ScriptCriteria("connectors.sqrl", "test"), // should not be executed
99-
new ScriptCriteria("flink_kafka.sqrl", "run") // does not expose an API
99+
new ScriptCriteria("flink_kafka.sqrl", "run"), // does not expose an API
100+
new ScriptCriteria(
101+
"temporal-join.sqrl",
102+
"run") // TODO: only 'run' when there are no tests (i.e. snapshot dir) - there is no
103+
// benefit to also running, it's wasteful
100104
);
101105

102106
static final Path PROJECT_ROOT = Path.of(System.getProperty("user.dir"));
@@ -386,7 +390,7 @@ public void printUseCaseNumbers(UseCaseTestParameter param) {
386390
@MethodSource("useCaseProvider")
387391
@Disabled
388392
public void runTestCaseByName(UseCaseTestParameter param) {
389-
if (param.sqrlFileName.equals("banking-batch.sqrl") && param.goal.equals("test")) {
393+
if (param.sqrlFileName.equals("temporal-join.sqrl") && param.goal.equals("test")) {
390394
testUseCase(param);
391395
} else {
392396
assumeFalse(true);

0 commit comments

Comments
 (0)