diff --git a/README.md b/README.md index 4aa6eeb3a..55f203f15 100644 --- a/README.md +++ b/README.md @@ -215,4 +215,20 @@ and limitations under the License. Cask is a trademark of Cask Data, Inc. All rights reserved. Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with -permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. +permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.. + +## Byte Size and Time Duration Parsers + +The Wrangler now supports parsing byte sizes and time durations with units: + +### Byte Size Format +- Supported units: B, KB, MB, GB, TB (decimal), KiB, MiB, GiB, TiB (binary) +- Examples: "10KB", "1.5MB", "2GiB" + +### Time Duration Format +- Supported units: ns, ms, s, m, h, d +- Examples: "100ms", "1.5s", "2h" + +### Aggregate Stats Directive +Aggregates byte size and time duration columns: + diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..35bb8c359 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,61 @@ +package io.cdap.wrangler.api.parser; + +import io.cdap.wrangler.api.annotations.PublicEvolving; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.gson.JsonElement; + + +@PublicEvolving +public class ByteSize implements Token { + private static final Pattern BYTE_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)"); + private final long bytes; + + public ByteSize(String value) { + super(); + Matcher matcher = BYTE_PATTERN.matcher(value); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid byte size format: " + value); + } + + double size = Double.parseDouble(matcher.group(1)); + String unit = matcher.group(3).toUpperCase(); + + switch (unit) { + case "B": bytes = (long) size; break; + case "KB": bytes = (long) (size * 1000); break; + case "MB": bytes = (long) (size * 1000 * 1000); break; + case "GB": bytes = (long) (size * 1000 * 1000 * 1000); break; + case "TB": bytes = (long) (size * 1000 * 1000 * 1000 * 1000); break; + case "KIB": bytes = (long) (size * 1024); break; + case "MIB": bytes = (long) (size * 1024 * 1024); break; + case "GIB": bytes = (long) (size * 1024 * 1024 * 1024); break; + case "TIB": bytes = (long) (size * 1024 * 1024 * 1024 * 1024); break; + default: throw new IllegalArgumentException("Unknown byte unit: " + unit); + } + } + + public long getBytes() { + return bytes; + } + + @Override + public Object value() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'value'"); + } + + @Override + public TokenType type() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'type'"); + } + + @Override + public JsonElement toJson() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'toJson'"); + } +} \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..eae95a78b --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,57 @@ +package io.cdap.wrangler.api.parser; + +import io.cdap.wrangler.api.annotations.PublicEvolving; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.gson.JsonElement; + +@PublicEvolving +public class TimeDuration implements Token { + private static final Pattern TIME_PATTERN = Pattern.compile("([0-9]+(\\.[0-9]+)?)([A-Za-z]+)"); + private final long nanoseconds; + + public TimeDuration(String value) { + super(); + Matcher matcher = TIME_PATTERN.matcher(value); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time duration format: " + value); + } + + double time = Double.parseDouble(matcher.group(1)); + String unit = matcher.group(3).toUpperCase(); + + switch (unit) { + case "NS": nanoseconds = (long) time; break; + case "MS": nanoseconds = (long) (time * 1_000_000); break; + case "S": nanoseconds = (long) (time * 1_000_000_000); break; + case "M": nanoseconds = (long) (time * 60 * 1_000_000_000); break; + case "H": nanoseconds = (long) (time * 60 * 60 * 1_000_000_000); break; + case "D": nanoseconds = (long) (time * 24 * 60 * 60 * 1_000_000_000); break; + default: throw new IllegalArgumentException("Unknown time unit: " + unit); + } + } + + public long getNanoseconds() { + return nanoseconds; + } + + @Override + public Object value() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'value'"); + } + + @Override + public TokenType type() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'type'"); + } + + @Override + public JsonElement toJson() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'toJson'"); + } +} \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java index 8c93b0e6a..21fb98f2e 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java @@ -152,5 +152,5 @@ public enum TokenType implements Serializable { * Represents the enumerated type for the object of type {@code String} with restrictions * on characters that can be present in a string. */ - IDENTIFIER + IDENTIFIER, BYTE_SIZE, TIME_DURATION } diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml index e2dcb3c2b..60eab78ff 100644 --- a/wrangler-core/pom.xml +++ b/wrangler-core/pom.xml @@ -309,6 +309,18 @@ cdap-system-app-api ${cdap.version} + + + io.cdap.wrangler + wrangler-core + 4.11.0 + + + + io.cdap.wrangler + wrangler-core + 4.11.0 + diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..419889128 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -140,8 +140,13 @@ numberRange ; value - : String | Number | Column | Bool - ; + : STRING #stringValue + | NUMBER #numberValue + | BOOLEAN #booleanValue + | NULL #nullValue + | BYTE_SIZE #byteSizeValue + | TIME_DURATION #timeDurationValue + ; ecommand : '!' Identifier @@ -195,6 +200,14 @@ identifierList : Identifier (',' Identifier)* ; +BYTE_SIZE + : NUMBER BYTE_UNIT + ; + +TIME_DURATION + : NUMBER TIME_UNIT + ; + /* * Following are the Lexer Rules used for tokenizing the recipe. @@ -311,3 +324,10 @@ fragment Int fragment Digit : [0-9] ; +fragment BYTE_UNIT + : ('B'|'KB'|'MB'|'GB'|'TB'|'KIB'|'MIB'|'GIB'|'TIB') + ; + +fragment TIME_UNIT + : ('NS'|'MS'|'S'|'M'|'H'|'D') + ; diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java new file mode 100644 index 000000000..9de722c77 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java @@ -0,0 +1,87 @@ +package io.cdap.directives.aggregates; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.wrangler.api.*; +import io.cdap.wrangler.api.parser.*; +import io.cdap.wrangler.expression.EL; +import io.cdap.wrangler.expression.ELContext; +import io.cdap.wrangler.expression.ELException; +import io.cdap.wrangler.expression.ELResult; + +import java.util.ArrayList; +import java.util.List; + +@Plugin(type = Directive.TYPE) +@Name("aggregate-stats") +@Description("Aggregates byte size and time duration columns") +public class AggregateStats implements Directive, AggregateInterpreter { + private String sizeColumn; + private String timeColumn; + private String outputSizeColumn; + private String outputTimeColumn; + private long totalBytes = 0; + private long totalNanos = 0; + private int rowCount = 0; + + @Override + public UsageDefinition define() { + UsageDefinition.Builder builder = UsageDefinition.builder("aggregate-stats"); + builder.define("size-column", TokenType.COLUMN_NAME); + builder.define("time-column", TokenType.COLUMN_NAME); + builder.define("output-size-column", TokenType.COLUMN_NAME); + builder.define("output-time-column", TokenType.COLUMN_NAME); + return builder.build(); + } + + @Override + public void initialize(Arguments args) throws DirectiveParseException { + this.sizeColumn = ((ColumnName) args.value("size-column")).value(); + this.timeColumn = ((ColumnName) args.value("time-column")).value(); + this.outputSizeColumn = ((ColumnName) args.value("output-size-column")).value(); + this.outputTimeColumn = ((ColumnName) args.value("output-time-column")).value(); + } + + @Override + public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { + for (Row row : rows) { + try { + // Process byte size + Object sizeObj = row.getValue(sizeColumn); + if (sizeObj != null) { + ByteSize byteSize = new ByteSize(sizeObj.toString()); + totalBytes += byteSize.getBytes(); + } + + // Process time duration + Object timeObj = row.getValue(timeColumn); + if (timeObj != null) { + TimeDuration timeDuration = new TimeDuration(timeObj.toString()); + totalNanos += timeDuration.getNanoseconds(); + } + + rowCount++; + } catch (Exception e) { + throw new DirectiveExecutionException(e.getMessage(), e); + } + } + return rows; + } + + @Override + public List finalize() throws DirectiveExecutionException { + Row row = new Row(); + row.add(outputSizeColumn, (double) totalBytes / (1024 * 1024)); // Convert to MB + row.add(outputTimeColumn, (double) totalNanos / 1_000_000_000); // Convert to seconds + List result = new ArrayList<>(); + result.add(row); + return result; + } + + @Override + public void destroy() { + // Clean up if needed + } +} + diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..825ec4bd0 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -22,6 +22,7 @@ import io.cdap.wrangler.api.Triplet; import io.cdap.wrangler.api.parser.Bool; import io.cdap.wrangler.api.parser.BoolList; +import io.cdap.wrangler.api.parser.ByteSize; import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.DirectiveName; @@ -33,6 +34,7 @@ import io.cdap.wrangler.api.parser.Ranges; import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TextList; +import io.cdap.wrangler.api.parser.TimeDuration; import io.cdap.wrangler.api.parser.Token; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.misc.Interval; @@ -87,6 +89,17 @@ public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx return super.visitDirective(ctx); } + @Override + public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) { + return new ByteSize(ctx.getText()); + } + + @Override + public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) { + return new TimeDuration(ctx.getText()); + } + + /** * A Directive can include identifiers, this method extracts that token that is being * identified as token of type Identifier. diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java new file mode 100644 index 000000000..4ea619294 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java @@ -0,0 +1,36 @@ +package io.cdap.wrangler.executor; + +import io.cdap.wrangler.api.Row; +// import java.io.cdap.wrangler.directives.aggregates.AggregateStats; +// "C:\zeotap\wrangler\wrangler-core\src\main\java\io\cdap\directives\aggregates\AggregateStats.java" +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class AggregateStatsTest { + @Test + public void testAggregation() throws Exception { + List rows = Arrays.asList( + new Row("data_transfer_size", "1MB").add("response_time", "100ms"), + new Row("data_transfer_size", "2MB").add("response_time", "200ms"), + new Row("data_transfer_size", "0.5MB").add("response_time", "50ms") + ); + + AggregateStats directive = new AggregateStats(); + directive.initialize(new TestArguments( + "size-column", "data_transfer_size", + "time-column", "response_time", + "output-size-column", "total_size_mb", + "output-time-column", "total_time_sec" + )); + + directive.execute(rows, null); + List results = directive.finalize(); + + Assert.assertEquals(1, results.size()); + Assert.assertEquals(3.5, (Double) results.get(0).getValue("total_size_mb"), 0.001); + Assert.assertEquals(0.350, (Double) results.get(0).getValue("total_time_sec"), 0.001); + } +} \ No newline at end of file diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java new file mode 100644 index 000000000..39186ce98 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/wrangler/parser/ByteSizeTest.java @@ -0,0 +1,17 @@ + +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.ByteSize; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ByteSizeTest { + @Test + public void testByteSizeParsing() { + assertEquals(1024, new ByteSize("1KB").getBytes()); + assertEquals(1024 * 1024, new ByteSize("1MB").getBytes()); + assertEquals(1000, new ByteSize("1KB").getBytes()); + assertEquals(1.5 * 1024 * 1024, new ByteSize("1.5MB").getBytes(), 0.001); + } +} diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java new file mode 100644 index 000000000..92b597ddc --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/wrangler/parser/TimeDurationTest.java @@ -0,0 +1,16 @@ +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.TimeDuration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TimeDurationTest { + @Test + public void testTimeDurationParsing() { + assertEquals(1_000_000, new TimeDuration("1ms").getNanoseconds()); + assertEquals(1_000_000_000, new TimeDuration("1s").getNanoseconds()); + assertEquals(60_000_000_000L, new TimeDuration("1m").getNanoseconds()); + assertEquals(2.5 * 1_000_000_000, new TimeDuration("2.5s").getNanoseconds(), 0.001); + } +}