Skip to content

New Emit Polices #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 76 additions & 110 deletions docs/query-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,73 @@ Timeplus supports some advanced `SETTINGS` to fine tune the streaming query proc

As an advanced feature, Timeplus supports various policies to emit results during streaming query.

The syntax is:
:::info
Please note, we updated the EMIT syntax in [Timeplus Enterprise 2.7.6](/enterprise-v2.7#2_7_6). Please upgrade to the latest version to use those refined emit polices.
:::

For [global aggregations](/stream-query#global-aggregation), the syntax is:

```sql
EMIT
[AFTER WATERMARK [WITH DELAY <interval>]
EMIT [STREAM|CHANGELOG]
[PERIODIC <interval> [REPEAT]]
[ON UPDATE]
- [[ AND ]TIMEOUT <interval>]
- [[ AND ]LAST <interval> [ON PROCTIME]]
[ON UPDATE [WITH BATCH <interval>] ]
```

By default `EMIT STREAM` and `PERIODIC 2s` are applied. Advanced settings:
* `EMIT CHANGELOG` works for [global aggregations](/stream-query#global-aggregation) and [non-aggregation tail/filter](/stream-query#non-aggregation). It will output `+1` or `-1` for `_tp_delta` column.

For [time-window aggregations](/stream-query#window-aggregation), the syntax is:

```sql
EMIT
[AFTER WINDOW CLOSE [WITH DELAY <interval> [AND TIMEOUT <interval>]]]
[PERIODIC <interval> [REPEAT] [WITH DELAY <interval> [AND TIMEOUT <interval>]]]
[ON UPDATE [WITH BATCH <interval>] [WITH DELAY <interval> [AND TIMEOUT <interval>]]]
```
You can only choose one of the emit policies: `AFTER WINDOW CLOSE`, `PERIODIC`, or `ON UPDATE`. If you omit any of them, the default policy is `AFTER WINDOW CLOSE`.

Examples:
```sql
EMIT STREAM AFTER WINDOW CLOSE WITH DELAY 1s AND TIMEOUT 5s
EMIT STREAM PERIODIC 1s REPEAT WITH DELAY 1s AND TIMEOUT 5s
EMIT ON UPDATE WITH DELAY 1s AND TIMEOUT 5s
EMIT ON UPDATE WITH BATCH 1s WITH DELAY 1s AND TIMEOUT 5s
```

### EMIT .. WITH DELAY {#emit_delay}

`WITH DELAY` and `AND TIMEOUT` only can be applied to time-window based aggregations.

By default, the query engine will emit the results immediately when the window is closed or other conditions are met. This behavior can be customized using the `WITH DELAY` clause. It allows you to specify extra time to progress the watermark, which can be useful for handling late data.

For example, if you want to wait for 1 second before emitting the results, you can use the following syntax:

```sql
EMIT AFTER WINDOW CLOSE WITH DELAY 1s
```

### EMIT AFTER WATERMARK {#emit_after_wm}
Please check the interactive demo on [Understanding Watermark](/understanding-watermark).

### EMIT .. WITH DELAY AND TIMEOUT {#emit_timeout}

You can omit `EMIT AFTER WATERMARK`, since this is the default behavior for time window aggregations. For example:
For time window based aggregations, when the window is closed is decided by the watermark. A new event outside the window will progress the watermark and inform the query engine to close the previous window and to emit aggregation results.

Say you only get one event for the time window. Since there is no more event, the watermark cannot be moved so the window won't be closed.

`EMIT .. TIMEOUT` is to force the window close, with a timeout after seeing last event.

Please note, if there no single event in the data stream, or in the time window, Proton won't emit result. For example, in the following SQL, you won't get 0 as the count:

```sql
SELECT window_start, count() as count FROM tumble(stream,2s)
GROUP BY window_start
```

Even you add `EMIT .. TIMEOUT` in the SQL, it won't trigger timeout, because the query engine doesn't see any event in the window. If you need to detect such missing event for certain time window, one workaround is to create a heartbeat stream and use `UNION` to create a subquery to combine both heartbeat stream and target stream, for a time window, if all observed events are from heartbeat stream, this means there is no event in the target stream. Please discuss more with us in community slack.

### EMIT AFTER WINDOW CLOSE {#emit_after}

You can omit `EMIT AFTER WINDOW CLOSE`, since this is the default behavior for time window aggregations. For example:

```sql
SELECT device, max(cpu_usage)
Expand All @@ -59,15 +112,15 @@ GROUP BY device, window_end

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by [Watermark](/stream-query#window-watermark), which is an internal timestamp. It is guaranteed to be increased monotonically per stream query.

### EMIT AFTER WATERMARK WITH DELAY {#emit_after_wm_with_delay}
### EMIT AFTER WINDOW CLOSE WITH DELAY {#emit_after_with_delay}

Example:

```sql
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, widnow_end
EMIT AFTER WATERMARK WITH DELAY 2s;
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
```

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `device_utils`. Every time a window is closed, Timeplus Proton waits for another 2 seconds and then emits the aggregation results.
Expand Down Expand Up @@ -110,12 +163,6 @@ EMIT PERIODIC 3s REPEAT

### EMIT ON UPDATE {#emit_on_update}

:::info

This is a new emit policy added in Proton 1.5.

:::

Since Proton 1.5, you can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:

```sql
Expand All @@ -132,106 +179,25 @@ EMIT ON UPDATE

During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.

### EMIT PERIODIC .. ON UPDATE {#emit_periodic_on_update}

:::info

This is a new emit policy added in Proton 1.5.

:::

You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Proton will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.

### EMIT TIMEOUT{#emit_timeout}

For time window based aggregations, when the window is closed is decided by the watermark. A new event outside the window will progress the watermark and inform the query engine to close the previous window and to emit aggregation results.

Say you only get one event for the time window. Since there is no more event, the watermark cannot be moved so the window won't be closed.

`EMIT TIMEOUT` is to force the window close, with a timeout after seeing last event.

Please note, if there no single event in the data stream, or in the time window, Proton won't emit result. For example, in the following SQL, you won't get 0 as the count:

```sql
SELECT window_start, count() as count FROM tumble(stream,2s)
GROUP BY window_start
```

Even you add `EMIT TIMEOUT` in the SQL, it won't trigger timeout, because the query engine doesn't see any event in the window. If you need to detect such missing event for certain time window, one workaround is to create a heartbeat stream and use `UNION` to create a subquery to combine both heartbeat stream and target stream, for a time window, if all observed events are from heartbeat stream, this means there is no event in the target stream. Please discuss more with us in community slack.

### EMIT LAST

In streaming processing, there is one typical query which is processing the last X seconds / minutes / hours of data. For example, show me the cpu usage per device in the last 1 hour. We call this type of processing `Last X Streaming Processing` in Timeplus and Timeplus provides a specialized SQL extension for ease of use: `EMIT LAST <n><UNIT>`. As in other parts of streaming queries, users can use interval shortcuts here.

:::info

By default, `EMIT LAST` uses the event time. Timeplus Proton will seek both streaming storage and historical to backfill data in last X time range. `EMIT LAST .. ON PROCTIME` uses the wall clock time to do the seek.

:::
### EMIT ON UPDATE WITH DELAY {#emit_on_update_with_delay}

#### EMIT LAST for Streaming Tail

Tailing events whose event timestamps are in the last X range.

Examples

```sql
SELECT *
FROM device_utils
WHERE cpu_usage > 80
EMIT LAST 5m
```

The above example filters events in the `device_utils` stream where `cpu_usage` is greater than 80% and events are appended in the last 5 minutes. Internally, Timeplus seeks streaming storage back to 5 minutes (wall-clock time from now) and tailing the data from there.

#### EMIT LAST for Global Aggregation
Adding the `WITH DELAY` to `EMIT ON UPDATE` will allow late event for the window aggregation.

```sql
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <stream_name>
[WHERE clause]
GROUP BY ...
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
```

**Note** Internally Timeplus chops streaming data into small windows and does the aggregation in each small window and as time goes, it slides out old small windows to keep the overall time window fixed and keep the incremental aggregation efficient. By default, the maximum keeping windows is 100. If the last X interval is very big and the periodic emit interval is small,
then users will need to explicitly set up a bigger max window : `last_x_interval / periodic_emit_interval`.

Examples

```sql
SELECT device, count(*)
FROM device_utils
WHERE cpu_usage > 80
GROUP BY device
EMIT PERIODIC 5s AND LAST 1h
SETTINGS max_keep_windows=720;
```

#### EMIT LAST for Windowed Aggregation

```sql
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <streaming_window_function>(<stream_name>, [<time_column>], [<window_size>], ...)
[WHERE clause]
GROUP BY ...
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
SELECT
window_start, cid, count() AS cnt
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
GROUP BY
window_start, cid
EMIT ON UPDATE WITH DELAY 2s
```

Examples
### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}

```sql
SELECT device, window_end, count(*)
FROM tumble(device_utils, 5s)
WHERE cpu_usage > 80
GROUP BY device, window_end
EMIT LAST 1h
SETTINGS max_keep_windows=720;
```

Similarly, we can apply the last X on hopping window.
You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Proton will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.

## PARTITION BY

Expand Down
16 changes: 16 additions & 0 deletions docs/v2-release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

This page summarizes changes for Timeplus Enterprise and Timeplus Proton, on a biweekly basis, including new features and important bug fixes.

## Apr 28, 2025

### Timeplus Proton v1.6.15
* fixed the issue that meta store raft service is hardcoded to listen on ipv6

### Timeplus Connect v0.8.17
[timeplus-connect](https://github.com/timeplus-io/timeplus-connect) provides Python connector to interact with Timeplus Enterprise or Timeplus Proton. In this release:
* added support for new `json` data type
* other bug fixes and enhancements

### Timeplus Go Driver v2.1.0
The [proton-go-driver](https://github.com/timeplus-io/proton-go-driver) provides Go connector to interact with Timeplus Enterprise or Timeplus Proton. In this release:
* added support for new `json` data type
* updated protocol to support receiving query id from the server
* other refinements and sample code updates

## Apr 14, 2025

### Timeplus Enterprise v2.6.8, v2.7.4, v2.7.5
Expand Down
2 changes: 1 addition & 1 deletion src/components/TimeplusWatermarkVisualization.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ const TimeplusWatermarkVisualization = () => {
GROUP BY
</span> window_start <br />
<span style={{ color: "#569CD6" }}>
EMIT AFTER WATERMARK
EMIT AFTER WINDOW CLOSE
</span>{" "}
<span style={{ color: "#569CD6" }}>WITH DELAY</span> {lag / 1000}s
</code>
Expand Down