Skip to content

Implemented byte size and time duration parsers with aggregate-stats … #985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,20 @@ and limitations under the License.
Cask is a trademark of Cask Data, Inc. All rights reserved.

Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks..

## Byte Size and Time Duration Parsers

The Wrangler now supports parsing byte sizes and time durations with units:

### Byte Size Format
- Supported units: B, KB, MB, GB, TB (decimal), KiB, MiB, GiB, TiB (binary)
- Examples: "10KB", "1.5MB", "2GiB"

### Time Duration Format
- Supported units: ns, ms, s, m, h, d
- Examples: "100ms", "1.5s", "2h"

### Aggregate Stats Directive
Aggregates byte size and time duration columns:

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.cdap.wrangler.api.parser;

import io.cdap.wrangler.api.annotations.PublicEvolving;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.gson.JsonElement;


@PublicEvolving
public class ByteSize implements Token {
private static final Pattern BYTE_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)");
private final long bytes;

public ByteSize(String value) {
super();
Matcher matcher = BYTE_PATTERN.matcher(value);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid byte size format: " + value);
}

double size = Double.parseDouble(matcher.group(1));
String unit = matcher.group(3).toUpperCase();

switch (unit) {
case "B": bytes = (long) size; break;
case "KB": bytes = (long) (size * 1000); break;
case "MB": bytes = (long) (size * 1000 * 1000); break;
case "GB": bytes = (long) (size * 1000 * 1000 * 1000); break;
case "TB": bytes = (long) (size * 1000 * 1000 * 1000 * 1000); break;
case "KIB": bytes = (long) (size * 1024); break;
case "MIB": bytes = (long) (size * 1024 * 1024); break;
case "GIB": bytes = (long) (size * 1024 * 1024 * 1024); break;
case "TIB": bytes = (long) (size * 1024 * 1024 * 1024 * 1024); break;
default: throw new IllegalArgumentException("Unknown byte unit: " + unit);
}
}

public long getBytes() {
return bytes;
}

@Override
public Object value() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'value'");
}

@Override
public TokenType type() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'type'");
}

@Override
public JsonElement toJson() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'toJson'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.cdap.wrangler.api.parser;

import io.cdap.wrangler.api.annotations.PublicEvolving;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.gson.JsonElement;

@PublicEvolving
public class TimeDuration implements Token {
private static final Pattern TIME_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)");
private final long nanoseconds;

public TimeDuration(String value) {
super();
Matcher matcher = TIME_PATTERN.matcher(value);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid time duration format: " + value);
}

double time = Double.parseDouble(matcher.group(1));
String unit = matcher.group(3).toUpperCase();

switch (unit) {
case "NS": nanoseconds = (long) time; break;
case "MS": nanoseconds = (long) (time * 1_000_000); break;
case "S": nanoseconds = (long) (time * 1_000_000_000); break;
case "M": nanoseconds = (long) (time * 60 * 1_000_000_000); break;
case "H": nanoseconds = (long) (time * 60 * 60 * 1_000_000_000); break;
case "D": nanoseconds = (long) (time * 24 * 60 * 60 * 1_000_000_000); break;
default: throw new IllegalArgumentException("Unknown time unit: " + unit);
}
}

public long getNanoseconds() {
return nanoseconds;
}

@Override
public Object value() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'value'");
}

@Override
public TokenType type() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'type'");
}

@Override
public JsonElement toJson() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'toJson'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,5 @@ public enum TokenType implements Serializable {
* Represents the enumerated type for the object of type {@code String} with restrictions
* on characters that can be present in a string.
*/
IDENTIFIER
IDENTIFIER, BYTE_SIZE, TIME_DURATION
}
12 changes: 12 additions & 0 deletions wrangler-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@
<artifactId>cdap-system-app-api</artifactId>
<version>${cdap.version}</version>
</dependency>

<dependency>
<groupId>io.cdap.wrangler</groupId>
<artifactId>wrangler-core</artifactId>
<version>4.11.0</version>
</dependency>

<dependency>
<groupId>io.cdap.wrangler</groupId>
<artifactId>wrangler-core</artifactId>
<version>4.11.0</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,13 @@ numberRange
;

value
: String | Number | Column | Bool
;
: STRING #stringValue
| NUMBER #numberValue
| BOOLEAN #booleanValue
| NULL #nullValue
| BYTE_SIZE #byteSizeValue
| TIME_DURATION #timeDurationValue
;

ecommand
: '!' Identifier
Expand Down Expand Up @@ -195,6 +200,14 @@ identifierList
: Identifier (',' Identifier)*
;

BYTE_SIZE
: NUMBER BYTE_UNIT
;

TIME_DURATION
: NUMBER TIME_UNIT
;


/*
* Following are the Lexer Rules used for tokenizing the recipe.
Expand Down Expand Up @@ -311,3 +324,10 @@ fragment Int
fragment Digit
: [0-9]
;
fragment BYTE_UNIT
: ('B'|'KB'|'MB'|'GB'|'TB'|'KIB'|'MIB'|'GIB'|'TIB')
;

fragment TIME_UNIT
: ('NS'|'MS'|'S'|'M'|'H'|'D')
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.cdap.directives.aggregates;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.wrangler.api.*;
import io.cdap.wrangler.api.parser.*;
import io.cdap.wrangler.expression.EL;
import io.cdap.wrangler.expression.ELContext;
import io.cdap.wrangler.expression.ELException;
import io.cdap.wrangler.expression.ELResult;

import java.util.ArrayList;
import java.util.List;

@Plugin(type = Directive.TYPE)
@Name("aggregate-stats")
@Description("Aggregates byte size and time duration columns")
public class AggregateStats implements Directive, AggregateInterpreter {
private String sizeColumn;
private String timeColumn;
private String outputSizeColumn;
private String outputTimeColumn;
private long totalBytes = 0;
private long totalNanos = 0;
private int rowCount = 0;

@Override
public UsageDefinition define() {
UsageDefinition.Builder builder = UsageDefinition.builder("aggregate-stats");
builder.define("size-column", TokenType.COLUMN_NAME);
builder.define("time-column", TokenType.COLUMN_NAME);
builder.define("output-size-column", TokenType.COLUMN_NAME);
builder.define("output-time-column", TokenType.COLUMN_NAME);
return builder.build();
}

@Override
public void initialize(Arguments args) throws DirectiveParseException {
this.sizeColumn = ((ColumnName) args.value("size-column")).value();
this.timeColumn = ((ColumnName) args.value("time-column")).value();
this.outputSizeColumn = ((ColumnName) args.value("output-size-column")).value();
this.outputTimeColumn = ((ColumnName) args.value("output-time-column")).value();
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
for (Row row : rows) {
try {
// Process byte size
Object sizeObj = row.getValue(sizeColumn);
if (sizeObj != null) {
ByteSize byteSize = new ByteSize(sizeObj.toString());
totalBytes += byteSize.getBytes();
}

// Process time duration
Object timeObj = row.getValue(timeColumn);
if (timeObj != null) {
TimeDuration timeDuration = new TimeDuration(timeObj.toString());
totalNanos += timeDuration.getNanoseconds();
}

rowCount++;
} catch (Exception e) {
throw new DirectiveExecutionException(e.getMessage(), e);
}
}
return rows;
}

@Override
public List<Row> finalize() throws DirectiveExecutionException {
Row row = new Row();
row.add(outputSizeColumn, (double) totalBytes / (1024 * 1024)); // Convert to MB
row.add(outputTimeColumn, (double) totalNanos / 1_000_000_000); // Convert to seconds
List<Row> result = new ArrayList<>();
result.add(row);
return result;
}

@Override
public void destroy() {
// Clean up if needed
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cdap.wrangler.api.Triplet;
import io.cdap.wrangler.api.parser.Bool;
import io.cdap.wrangler.api.parser.BoolList;
import io.cdap.wrangler.api.parser.ByteSize;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.ColumnNameList;
import io.cdap.wrangler.api.parser.DirectiveName;
Expand All @@ -33,6 +34,7 @@
import io.cdap.wrangler.api.parser.Ranges;
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TextList;
import io.cdap.wrangler.api.parser.TimeDuration;
import io.cdap.wrangler.api.parser.Token;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.misc.Interval;
Expand Down Expand Up @@ -87,6 +89,17 @@ public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx
return super.visitDirective(ctx);
}

@Override
public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) {
return new ByteSize(ctx.getText());
}

@Override
public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) {
return new TimeDuration(ctx.getText());
}


/**
* A Directive can include identifiers, this method extracts that token that is being
* identified as token of type <code>Identifier</code>.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.cdap.wrangler.executor;

import io.cdap.wrangler.api.Row;
// import java.io.cdap.wrangler.directives.aggregates.AggregateStats;
// "C:\zeotap\wrangler\wrangler-core\src\main\java\io\cdap\directives\aggregates\AggregateStats.java"
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class AggregateStatsTest {
@Test
public void testAggregation() throws Exception {
List<Row> rows = Arrays.asList(
new Row("data_transfer_size", "1MB").add("response_time", "100ms"),
new Row("data_transfer_size", "2MB").add("response_time", "200ms"),
new Row("data_transfer_size", "0.5MB").add("response_time", "50ms")
);

AggregateStats directive = new AggregateStats();
directive.initialize(new TestArguments(
"size-column", "data_transfer_size",
"time-column", "response_time",
"output-size-column", "total_size_mb",
"output-time-column", "total_time_sec"
));

directive.execute(rows, null);
List<Row> results = directive.finalize();

Assert.assertEquals(1, results.size());
Assert.assertEquals(3.5, (Double) results.get(0).getValue("total_size_mb"), 0.001);
Assert.assertEquals(0.350, (Double) results.get(0).getValue("total_time_sec"), 0.001);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

package io.cdap.wrangler.parser;

import io.cdap.wrangler.api.parser.ByteSize;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class ByteSizeTest {
@Test
public void testByteSizeParsing() {
assertEquals(1024, new ByteSize("1KB").getBytes());
assertEquals(1024 * 1024, new ByteSize("1MB").getBytes());
assertEquals(1000, new ByteSize("1KB").getBytes());
assertEquals(1.5 * 1024 * 1024, new ByteSize("1.5MB").getBytes(), 0.001);
}
}
Loading