diff --git a/README.md b/README.md
index 4aa6eeb3a..55f203f15 100644
--- a/README.md
+++ b/README.md
@@ -215,4 +215,20 @@ and limitations under the License.
Cask is a trademark of Cask Data, Inc. All rights reserved.
Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
-permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.
+permission. No endorsement by The Apache Software Foundation is implied by the use of these marks..
+
+## Byte Size and Time Duration Parsers
+
+The Wrangler now supports parsing byte sizes and time durations with units:
+
+### Byte Size Format
+- Supported units: B, KB, MB, GB, TB (decimal), KiB, MiB, GiB, TiB (binary)
+- Examples: "10KB", "1.5MB", "2GiB"
+
+### Time Duration Format
+- Supported units: ns, ms, s, m, h, d
+- Examples: "100ms", "1.5s", "2h"
+
+### Aggregate Stats Directive
+Aggregates byte size and time duration columns:
+
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java
new file mode 100644
index 000000000..35bb8c359
--- /dev/null
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java
@@ -0,0 +1,61 @@
+package io.cdap.wrangler.api.parser;
+
+import io.cdap.wrangler.api.annotations.PublicEvolving;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.gson.JsonElement;
+
+
+@PublicEvolving
+public class ByteSize implements Token {
+ private static final Pattern BYTE_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)");
+ private final long bytes;
+
+ public ByteSize(String value) {
+ super();
+ Matcher matcher = BYTE_PATTERN.matcher(value);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid byte size format: " + value);
+ }
+
+ double size = Double.parseDouble(matcher.group(1));
+ String unit = matcher.group(3).toUpperCase();
+
+ switch (unit) {
+ case "B": bytes = (long) size; break;
+ case "KB": bytes = (long) (size * 1000); break;
+ case "MB": bytes = (long) (size * 1000 * 1000); break;
+ case "GB": bytes = (long) (size * 1000 * 1000 * 1000); break;
+ case "TB": bytes = (long) (size * 1000 * 1000 * 1000 * 1000); break;
+ case "KIB": bytes = (long) (size * 1024); break;
+ case "MIB": bytes = (long) (size * 1024 * 1024); break;
+ case "GIB": bytes = (long) (size * 1024 * 1024 * 1024); break;
+ case "TIB": bytes = (long) (size * 1024 * 1024 * 1024 * 1024); break;
+ default: throw new IllegalArgumentException("Unknown byte unit: " + unit);
+ }
+ }
+
+ public long getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public Object value() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'value'");
+ }
+
+ @Override
+ public TokenType type() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'type'");
+ }
+
+ @Override
+ public JsonElement toJson() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'toJson'");
+ }
+}
\ No newline at end of file
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java
new file mode 100644
index 000000000..eae95a78b
--- /dev/null
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java
@@ -0,0 +1,57 @@
+package io.cdap.wrangler.api.parser;
+
+import io.cdap.wrangler.api.annotations.PublicEvolving;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.gson.JsonElement;
+
+@PublicEvolving
+public class TimeDuration implements Token {
+ private static final Pattern TIME_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)");
+ private final long nanoseconds;
+
+ public TimeDuration(String value) {
+ super();
+ Matcher matcher = TIME_PATTERN.matcher(value);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid time duration format: " + value);
+ }
+
+ double time = Double.parseDouble(matcher.group(1));
+ String unit = matcher.group(3).toUpperCase();
+
+ switch (unit) {
+ case "NS": nanoseconds = (long) time; break;
+ case "MS": nanoseconds = (long) (time * 1_000_000); break;
+ case "S": nanoseconds = (long) (time * 1_000_000_000); break;
+ case "M": nanoseconds = (long) (time * 60 * 1_000_000_000); break;
+ case "H": nanoseconds = (long) (time * 60 * 60 * 1_000_000_000); break;
+ case "D": nanoseconds = (long) (time * 24 * 60 * 60 * 1_000_000_000); break;
+ default: throw new IllegalArgumentException("Unknown time unit: " + unit);
+ }
+ }
+
+ public long getNanoseconds() {
+ return nanoseconds;
+ }
+
+ @Override
+ public Object value() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'value'");
+ }
+
+ @Override
+ public TokenType type() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'type'");
+ }
+
+ @Override
+ public JsonElement toJson() {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method 'toJson'");
+ }
+}
\ No newline at end of file
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
index 8c93b0e6a..21fb98f2e 100644
--- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
@@ -152,5 +152,5 @@ public enum TokenType implements Serializable {
* Represents the enumerated type for the object of type {@code String} with restrictions
* on characters that can be present in a string.
*/
- IDENTIFIER
+ IDENTIFIER, BYTE_SIZE, TIME_DURATION
}
diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml
index e2dcb3c2b..60eab78ff 100644
--- a/wrangler-core/pom.xml
+++ b/wrangler-core/pom.xml
@@ -309,6 +309,18 @@
cdap-system-app-api
${cdap.version}
+
+
+ io.cdap.wrangler
+ wrangler-core
+ 4.11.0
+
+
+
+ io.cdap.wrangler
+ wrangler-core
+ 4.11.0
+
diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
index 7c517ed6a..419889128 100644
--- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
+++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
@@ -140,8 +140,13 @@ numberRange
;
value
- : String | Number | Column | Bool
- ;
+ : STRING #stringValue
+ | NUMBER #numberValue
+ | BOOLEAN #booleanValue
+ | NULL #nullValue
+ | BYTE_SIZE #byteSizeValue
+ | TIME_DURATION #timeDurationValue
+ ;
ecommand
: '!' Identifier
@@ -195,6 +200,14 @@ identifierList
: Identifier (',' Identifier)*
;
+BYTE_SIZE
+ : NUMBER BYTE_UNIT
+ ;
+
+TIME_DURATION
+ : NUMBER TIME_UNIT
+ ;
+
/*
* Following are the Lexer Rules used for tokenizing the recipe.
@@ -311,3 +324,10 @@ fragment Int
fragment Digit
: [0-9]
;
+fragment BYTE_UNIT
+ : ('B'|'KB'|'MB'|'GB'|'TB'|'KIB'|'MIB'|'GIB'|'TIB')
+ ;
+
+fragment TIME_UNIT
+ : ('NS'|'MS'|'S'|'M'|'H'|'D')
+ ;
diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java
new file mode 100644
index 000000000..9de722c77
--- /dev/null
+++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java
@@ -0,0 +1,87 @@
+package io.cdap.directives.aggregates;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.wrangler.api.*;
+import io.cdap.wrangler.api.parser.*;
+import io.cdap.wrangler.expression.EL;
+import io.cdap.wrangler.expression.ELContext;
+import io.cdap.wrangler.expression.ELException;
+import io.cdap.wrangler.expression.ELResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Plugin(type = Directive.TYPE)
+@Name("aggregate-stats")
+@Description("Aggregates byte size and time duration columns")
+public class AggregateStats implements Directive, AggregateInterpreter {
+ private String sizeColumn;
+ private String timeColumn;
+ private String outputSizeColumn;
+ private String outputTimeColumn;
+ private long totalBytes = 0;
+ private long totalNanos = 0;
+ private int rowCount = 0;
+
+ @Override
+ public UsageDefinition define() {
+ UsageDefinition.Builder builder = UsageDefinition.builder("aggregate-stats");
+ builder.define("size-column", TokenType.COLUMN_NAME);
+ builder.define("time-column", TokenType.COLUMN_NAME);
+ builder.define("output-size-column", TokenType.COLUMN_NAME);
+ builder.define("output-time-column", TokenType.COLUMN_NAME);
+ return builder.build();
+ }
+
+ @Override
+ public void initialize(Arguments args) throws DirectiveParseException {
+ this.sizeColumn = ((ColumnName) args.value("size-column")).value();
+ this.timeColumn = ((ColumnName) args.value("time-column")).value();
+ this.outputSizeColumn = ((ColumnName) args.value("output-size-column")).value();
+ this.outputTimeColumn = ((ColumnName) args.value("output-time-column")).value();
+ }
+
+ @Override
+ public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException {
+ for (Row row : rows) {
+ try {
+ // Process byte size
+ Object sizeObj = row.getValue(sizeColumn);
+ if (sizeObj != null) {
+ ByteSize byteSize = new ByteSize(sizeObj.toString());
+ totalBytes += byteSize.getBytes();
+ }
+
+ // Process time duration
+ Object timeObj = row.getValue(timeColumn);
+ if (timeObj != null) {
+ TimeDuration timeDuration = new TimeDuration(timeObj.toString());
+ totalNanos += timeDuration.getNanoseconds();
+ }
+
+ rowCount++;
+ } catch (Exception e) {
+ throw new DirectiveExecutionException(e.getMessage(), e);
+ }
+ }
+ return rows;
+ }
+
+ @Override
+ public List finalize() throws DirectiveExecutionException {
+ Row row = new Row();
+ row.add(outputSizeColumn, (double) totalBytes / (1024 * 1024)); // Convert to MB
+ row.add(outputTimeColumn, (double) totalNanos / 1_000_000_000); // Convert to seconds
+ List result = new ArrayList<>();
+ result.add(row);
+ return result;
+ }
+
+ @Override
+ public void destroy() {
+ // Clean up if needed
+ }
+}
+
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
index ac35e7a5e..825ec4bd0 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
@@ -22,6 +22,7 @@
import io.cdap.wrangler.api.Triplet;
import io.cdap.wrangler.api.parser.Bool;
import io.cdap.wrangler.api.parser.BoolList;
+import io.cdap.wrangler.api.parser.ByteSize;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.ColumnNameList;
import io.cdap.wrangler.api.parser.DirectiveName;
@@ -33,6 +34,7 @@
import io.cdap.wrangler.api.parser.Ranges;
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TextList;
+import io.cdap.wrangler.api.parser.TimeDuration;
import io.cdap.wrangler.api.parser.Token;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.misc.Interval;
@@ -87,6 +89,17 @@ public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx
return super.visitDirective(ctx);
}
+ @Override
+ public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) {
+ return new ByteSize(ctx.getText());
+ }
+
+ @Override
+ public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) {
+ return new TimeDuration(ctx.getText());
+ }
+
+
/**
* A Directive can include identifiers, this method extracts that token that is being
* identified as token of type Identifier
.
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java
new file mode 100644
index 000000000..4ea619294
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java
@@ -0,0 +1,36 @@
+package io.cdap.wrangler.executor;
+
+import io.cdap.wrangler.api.Row;
+// import java.io.cdap.wrangler.directives.aggregates.AggregateStats;
+// "C:\zeotap\wrangler\wrangler-core\src\main\java\io\cdap\directives\aggregates\AggregateStats.java"
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AggregateStatsTest {
+ @Test
+ public void testAggregation() throws Exception {
+ List rows = Arrays.asList(
+ new Row("data_transfer_size", "1MB").add("response_time", "100ms"),
+ new Row("data_transfer_size", "2MB").add("response_time", "200ms"),
+ new Row("data_transfer_size", "0.5MB").add("response_time", "50ms")
+ );
+
+ AggregateStats directive = new AggregateStats();
+ directive.initialize(new TestArguments(
+ "size-column", "data_transfer_size",
+ "time-column", "response_time",
+ "output-size-column", "total_size_mb",
+ "output-time-column", "total_time_sec"
+ ));
+
+ directive.execute(rows, null);
+ List results = directive.finalize();
+
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(3.5, (Double) results.get(0).getValue("total_size_mb"), 0.001);
+ Assert.assertEquals(0.350, (Double) results.get(0).getValue("total_time_sec"), 0.001);
+ }
+}
\ No newline at end of file
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java
new file mode 100644
index 000000000..39186ce98
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java
@@ -0,0 +1,17 @@
+
+package io.cdap.wrangler.parser;
+
+import io.cdap.wrangler.api.parser.ByteSize;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ByteSizeTest {
+ @Test
+ public void testByteSizeParsing() {
+ assertEquals(1024, new ByteSize("1KB").getBytes());
+ assertEquals(1024 * 1024, new ByteSize("1MB").getBytes());
+ assertEquals(1000, new ByteSize("1KB").getBytes());
+ assertEquals(1.5 * 1024 * 1024, new ByteSize("1.5MB").getBytes(), 0.001);
+ }
+}
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java
new file mode 100644
index 000000000..92b597ddc
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java
@@ -0,0 +1,16 @@
+package io.cdap.wrangler.parser;
+
+import io.cdap.wrangler.api.parser.TimeDuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeDurationTest {
+ @Test
+ public void testTimeDurationParsing() {
+ assertEquals(1_000_000, new TimeDuration("1ms").getNanoseconds());
+ assertEquals(1_000_000_000, new TimeDuration("1s").getNanoseconds());
+ assertEquals(60_000_000_000L, new TimeDuration("1m").getNanoseconds());
+ assertEquals(2.5 * 1_000_000_000, new TimeDuration("2.5s").getNanoseconds(), 0.001);
+ }
+}