diff --git a/.github/workflows/build-report.yml b/.github/workflows/build-report.yml index 9c2f33843..1a6545e9c 100644 --- a/.github/workflows/build-report.yml +++ b/.github/workflows/build-report.yml @@ -1,56 +1,37 @@ -# Copyright © 2024 Cask Data, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven -# Note: Any changes to this workflow would be used only after merging into develop name: Build Unit Tests Report on: workflow_run: workflows: - - Build with unit tests + - Build with unit tests types: - - completed + - completed permissions: - actions: read # Allows reading workflow run information - statuses: write # Required if the action updates commit statuses - checks: write # Required if it updates GitHub Checks API + actions: read + statuses: write + checks: write jobs: build: runs-on: ubuntu-latest - if: ${{ github.event.workflow_run.conclusion != 'skipped' }} steps: - # Pinned 1.0.0 version - - uses: marocchino/action-workflow_run-status@54b6e87d6cb552fc5f36dbe9a722a6048725917a - - - name: Download artifact - uses: actions/download-artifact@v4 - with: - github-token: ${{ secrets.GITHUB_TOKEN }} - run-id: ${{ github.event.workflow_run.id }} - path: artifacts/ - - - name: Surefire Report - # Pinned 3.5.2 version - uses: mikepenz/action-junit-report@16a9560bd02f11e7e3bf6b3e2ef6bba6c9d07c32 - if: always() - with: - report_paths: '**/target/surefire-reports/TEST-*.xml' - github_token: ${{ secrets.GITHUB_TOKEN }} - detailed_summary: true - commit: ${{ github.event.workflow_run.head_sha }} - check_name: Build Test Report - + - uses: marocchino/action-workflow_run-status@54b6e87d6cb552fc5f36dbe9a722a6048725917a + + - name: Download artifact + uses: actions/download-artifact@v4 + with: + run-id: ${{ github.event.workflow_run.id }} + path: artifacts/ + + - name: Surefire Report + uses: mikepenz/action-junit-report@16a9560bd02f11e7e3bf6b3e2ef6bba6c9d07c32 + if: always() + with: + report_paths: '**/target/surefire-reports/TEST-*.xml' + detailed_summary: true + github_token: ${{ secrets.GITHUB_TOKEN }} + commit: ${{ github.event.workflow_run.head_sha }} + check_name: Build Test Report diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c4d80553c..22a0c64d2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,17 +1,3 @@ -# Copyright © 2021 Cask Data, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven -# Note: Any changes to this workflow would be used only after merging into develop name: Build with unit tests on: @@ -25,30 +11,25 @@ jobs: build: runs-on: k8s-runner-build - # We allow builds: - # 1) When it's a merge into a branch - # 2) For PRs that are labeled as build and - # - It's a code change - # - A build label was just added - # A bit complex, but prevents builds when other labels are manipulated if: > github.event_name == 'push' || (contains(github.event.pull_request.labels.*.name, 'build') - && (github.event.action != 'labeled' || github.event.label.name == 'build') - ) + && (github.event.action != 'labeled' || github.event.label.name == 'build')) + steps: - uses: actions/checkout@v4 - with: - ref: ${{ github.event.workflow_run.head_sha }} - - name: Cache + + - name: Cache Maven packages uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ github.workflow }}-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven-${{ github.workflow }} + - name: Build with Maven run: mvn clean test -fae -T 2 -B -V -DcloudBuild -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 + - name: Archive build artifacts uses: actions/upload-artifact@v4 if: always() @@ -57,4 +38,3 @@ jobs: path: | **/target/rat.txt **/target/surefire-reports/* - diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Bytesize.java b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Bytesize.java new file mode 100644 index 000000000..8a69c84fa --- /dev/null +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Bytesize.java @@ -0,0 +1,41 @@ +package io.cdap.wrangler.api.parser; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A token that represents byte size like 10KB, 1.5MB, etc. + */ +public class ByteSize extends Token { + private static final Pattern PATTERN = Pattern.compile("(?i)([\\d.]+)\\s*(B|KB|MB|GB|TB)"); + private final double size; + private final String unit; + + public ByteSize(String value) { + super(value); + Matcher matcher = PATTERN.matcher(value.trim()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid byte size format: " + value); + } + + this.size = Double.parseDouble(matcher.group(1)); + this.unit = matcher.group(2).toUpperCase(); + } + + public long getBytes() { + switch (unit) { + case "B": + return (long) size; + case "KB": + return (long) (size * 1024); + case "MB": + return (long) (size * 1024 * 1024); + case "GB": + return (long) (size * 1024 * 1024 * 1024); + case "TB": + return (long) (size * 1024L * 1024 * 1024 * 1024); + default: + throw new IllegalStateException("Unknown unit: " + unit); + } + } +} diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..cf91b3144 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -41,6 +41,22 @@ options { /** * Parser Grammar for recognizing tokens and constructs of the directives language. */ +BYTE_SIZE + : [0-9]+ ('.' [0-9]+)? BYTE_UNIT + ; + +TIME_DURATION + : [0-9]+ ('.' [0-9]+)? TIME_UNIT + ; + +fragment BYTE_UNIT + : ('B' | 'KB' | 'MB' | 'GB' | 'TB' | 'b' | 'kb' | 'mb' | 'gb' | 'tb') + ; + +fragment TIME_UNIT + : ('ns' | 'us' | 'ms' | 's' | 'm' | 'h' | 'd') + ; + recipe : statements EOF ; @@ -140,9 +156,15 @@ numberRange ; value - : String | Number | Column | Bool + : String + | Number + | Column + | Bool + | BYTE_SIZE + | TIME_DURATION ; + ecommand : '!' Identifier ; @@ -154,6 +176,14 @@ config column : Column ; +byteSizeArg + : BYTE_SIZE + ; + +timeDurationArg + : TIME_DURATION + ; + text : String diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/RecipeVisitor.java new file mode 100644 index 000000000..adce5762c --- /dev/null +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/RecipeVisitor.java @@ -0,0 +1,316 @@ +@@ -1,329 +1,339 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.parser; +import io.cdap.wrangler.api.LazyNumber; +import io.cdap.wrangler.api.RecipeSymbol; +import io.cdap.wrangler.api.SourceInfo; +import io.cdap.wrangler.api.Triplet; +import io.cdap.wrangler.api.parser.Bool; +import io.cdap.wrangler.api.parser.BoolList; +import io.cdap.wrangler.api.parser.ColumnName; +import io.cdap.wrangler.api.parser.ColumnNameList; +import io.cdap.wrangler.api.parser.DirectiveName; +import io.cdap.wrangler.api.parser.Expression; +import io.cdap.wrangler.api.parser.Identifier; +import io.cdap.wrangler.api.parser.Numeric; +import io.cdap.wrangler.api.parser.NumericList; +import io.cdap.wrangler.api.parser.Properties; +import io.cdap.wrangler.api.parser.Ranges; +import io.cdap.wrangler.api.parser.Text; +import io.cdap.wrangler.api.parser.TextList; +import io.cdap.wrangler.api.parser.Token; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.misc.Interval; +import org.antlr.v4.runtime.tree.ParseTree; +import org.antlr.v4.runtime.tree.TerminalNode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +/** + * This class RecipeVisitor implements the visitor pattern + * used during traversal of the AST tree. The ParserTree#Walker + * invokes appropriate methods as call backs with information about the node. + * + *

In order to understand what's being invoked, please look at the grammar file + * Directive.g4

. + * + *

This class exposes a getTokenGroups method for retrieving the + * RecipeSymbol after visiting. The RecipeSymbol represents + * all the TokenGroup for all directives in a recipe. Each directive + * will create a TokenGroup

+ * + *

As the ParseTree is walking through the call graph, it generates + * one TokenGroup for each directive in the recipe. Each TokenGroup + * contains parsed Tokens for that directive along with more information like + * SourceInfo. A collection of TokenGroup consistutes a RecipeSymbol + * that is returned by this function.

+ */ +public final class RecipeVisitor extends DirectivesBaseVisitor { + private RecipeSymbol.Builder builder = new RecipeSymbol.Builder(); + /** + * Returns a RecipeSymbol for the recipe being parsed. This + * object has all the tokens that were successfully parsed along with source + * information for each directive in the recipe. + * + * @return An compiled object after parsing the recipe. + */ + public RecipeSymbol getCompiledUnit() { + return builder.build(); + } + /** + * A Recipe is made up of Directives and Directives is made up of each individual + * Directive. This method is invoked on every visit to a new directive in the recipe. + */ + @Override + public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx) { + builder.createTokenGroup(getOriginalSource(ctx)); + return super.visitDirective(ctx); + } + /** + * A Directive can include identifiers, this method extracts that token that is being + * identified as token of type Identifier. + */ + @Override + public RecipeSymbol.Builder visitIdentifier(DirectivesParser.IdentifierContext ctx) { + builder.addToken(new Identifier(ctx.Identifier().getText())); + return super.visitIdentifier(ctx); + } + /** + * A Directive can include properties (which are a collection of key and value pairs), + * this method extracts that token that is being identified as token of type Properties. + */ + @Override + public RecipeSymbol.Builder visitPropertyList(DirectivesParser.PropertyListContext ctx) { + Map props = new HashMap<>(); + List properties = ctx.property(); + for (DirectivesParser.PropertyContext property : properties) { + String identifier = property.Identifier().getText(); + Token token; + if (property.number() != null) { + token = new Numeric(new LazyNumber(property.number().getText())); + } else if (property.bool() != null) { + token = new Bool(Boolean.valueOf(property.bool().getText())); + } else { + String text = property.text().getText(); + token = new Text(text.substring(1, text.length() - 1)); + } + props.put(identifier, token); + } + builder.addToken(new Properties(props)); + return builder; + } + /** + * A Pragma is an instruction to the compiler to dynamically load the directives being specified + * from the DirectiveRegistry. These do not affect the data flow. + * + *

E.g. #pragma load-directives test1, test2, test3; will collect the tokens + * test1, test2 and test3 as dynamically loadable directives.

+ */ + @Override + public RecipeSymbol.Builder visitPragmaLoadDirective(DirectivesParser.PragmaLoadDirectiveContext ctx) { + List identifiers = ctx.identifierList().Identifier(); + for (TerminalNode identifier : identifiers) { + builder.addLoadableDirective(identifier.getText()); + } + return builder; + } + /** + * A Pragma version is a informational directive to notify compiler about the grammar that is should + * be using to parse the directives below. + */ + @Override + public RecipeSymbol.Builder visitPragmaVersion(DirectivesParser.PragmaVersionContext ctx) { + builder.addVersion(ctx.Number().getText()); + return builder; + } + /** + * A Directive can include number ranges like start:end=value[,start:end=value]*. This + * visitor method allows you to collect all the number ranges and create a token type + * Ranges. + */ + @Override + public RecipeSymbol.Builder visitNumberRanges(DirectivesParser.NumberRangesContext ctx) { + List> output = new ArrayList<>(); + List ranges = ctx.numberRange(); + for (DirectivesParser.NumberRangeContext range : ranges) { + List numbers = range.Number(); + String text = range.value().getText(); + if (text.startsWith("'") && text.endsWith("'")) { + text = text.substring(1, text.length() - 1); + } + Triplet val = + new Triplet<>(new Numeric(new LazyNumber(numbers.get(0).getText())), + new Numeric(new LazyNumber(numbers.get(1).getText())), + text + ); + output.add(val); + } + builder.addToken(new Ranges(output)); + return builder; + } + /** + * This visitor method extracts the custom directive name specified. The custom + * directives are specified with a bang (!) at the start. + */ + @Override + public RecipeSymbol.Builder visitEcommand(DirectivesParser.EcommandContext ctx) { + builder.addToken(new DirectiveName(ctx.Identifier().getText())); + return builder; + } + /** + * A Directive can consist of column specifiers. These are columns that the directive + * would operate on. When a token of type column is visited, it would generate a token + * type of type ColumnName. + */ + @Override + public RecipeSymbol.Builder visitColumn(DirectivesParser.ColumnContext ctx) { + builder.addToken(new ColumnName(ctx.Column().getText().substring(1))); + return builder; + } + /** + * A Directive can consist of text field. These type of fields are enclosed within + * a single-quote or a double-quote. This visitor method extracts the string value + * within the quotes and creates a token type Text. + */ + @Override + public RecipeSymbol.Builder visitText(DirectivesParser.TextContext ctx) { + String value = ctx.String().getText(); + builder.addToken(new Text(value.substring(1, value.length() - 1))); + return builder; + } + /** + * A Directive can consist of numeric field. This visitor method extracts the + * numeric value Numeric. + */ + @Override + public RecipeSymbol.Builder visitNumber(DirectivesParser.NumberContext ctx) { + LazyNumber number = new LazyNumber(ctx.Number().getText()); + builder.addToken(new Numeric(number)); + return builder; + } + /** + * A Directive can consist of Bool field. The Bool field is represented as + * either true or false. This visitor method extract the bool value into a + * token type Bool. + */ + @Override + public RecipeSymbol.Builder visitBool(DirectivesParser.BoolContext ctx) { + builder.addToken(new Bool(Boolean.valueOf(ctx.Bool().getText()))); + return builder; + } + /** + * A Directive can include a expression or a condition to be evaluated. When + * such a token type is found, the visitor extracts the expression and generates + * a token type Expression to be added to the TokenGroup + */ + @Override + public RecipeSymbol.Builder visitCondition(DirectivesParser.ConditionContext ctx) { + int childCount = ctx.getChildCount(); + StringBuilder sb = new StringBuilder(); + for (int i = 1; i < childCount - 1; ++i) { + ParseTree child = ctx.getChild(i); + sb.append(child.getText()).append(" "); + } + builder.addToken(new Expression(sb.toString())); + return builder; + } + /** + * A Directive has name and in the parsing context it's called a command. + * This visitor methods extracts the command and creates a toke type DirectiveName + */ + @Override + public RecipeSymbol.Builder visitCommand(DirectivesParser.CommandContext ctx) { + builder.addToken(new DirectiveName(ctx.Identifier().getText())); + return builder; + } + /** + * This visitor methods extracts the list of columns specified. It creates a token + * type ColumnNameList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitColList(DirectivesParser.ColListContext ctx) { + List columns = ctx.Column(); + List names = new ArrayList<>(); + for (TerminalNode column : columns) { + names.add(column.getText().substring(1)); + } + builder.addToken(new ColumnNameList(names)); + return builder; + } + /** + * This visitor methods extracts the list of numeric specified. It creates a token + * type NumericList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitNumberList(DirectivesParser.NumberListContext ctx) { + List numbers = ctx.Number(); + List numerics = new ArrayList<>(); + for (TerminalNode number : numbers) { + numerics.add(new LazyNumber(number.getText())); + } + builder.addToken(new NumericList(numerics)); + return builder; + } + /** + * This visitor methods extracts the list of booleans specified. It creates a token + * type BoolList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitBoolList(DirectivesParser.BoolListContext ctx) { + List bools = ctx.Bool(); + List booleans = new ArrayList<>(); + for (TerminalNode bool : bools) { + booleans.add(Boolean.parseBoolean(bool.getText())); + } + builder.addToken(new BoolList(booleans)); + return builder; + } + /** + * This visitor methods extracts the list of strings specified. It creates a token + * type StringList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitStringList(DirectivesParser.StringListContext ctx) { + List strings = ctx.String(); + List strs = new ArrayList<>(); + for (TerminalNode string : strings) { + String text = string.getText(); + strs.add(text.substring(1, text.length() - 1)); + } + builder.addToken(new TextList(strs)); + return builder; + } + private SourceInfo getOriginalSource(ParserRuleContext ctx) { + int a = ctx.getStart().getStartIndex(); + int b = ctx.getStop().getStopIndex(); + Interval interval = new Interval(a, b); + String text = ctx.start.getInputStream().getText(interval); + int lineno = ctx.getStart().getLine(); + int column = ctx.getStart().getCharPositionInLine(); + return new SourceInfo(lineno, column, text); + } + @Override +public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) { + return new ByteSize(ctx.getText()); +} + +@Override +public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) { + return new TimeDuration(ctx.getText()); +} + diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/WranglerParserVisitorImpl.java b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/WranglerParserVisitorImpl.java new file mode 100644 index 000000000..cf1fca8ae --- /dev/null +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/WranglerParserVisitorImpl.java @@ -0,0 +1,16 @@ +public class WranglerParserVisitorImpl extends WranglerParserBaseVisitor { + + @Override + public Void visitByteSize(WranglerParser.ByteSizeContext ctx) { + // Your logic for visiting the ByteSize node + return visitChildren(ctx); + } + + @Override + public Void visitTimeSize(WranglerParser.TimeSizeContext ctx) { + // Your logic for visiting the TimeSize node + return visitChildren(ctx); + } + + // Other visit methods for different nodes... +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/directive/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/wrangler/directive/AggregateStats.java new file mode 100644 index 000000000..89c87cd99 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/directive/AggregateStats.java @@ -0,0 +1,32 @@ +public class AggregateStats implements Directive { + private String sizeCol, timeCol, outSizeCol, outTimeCol; + private long totalBytes = 0, totalNanos = 0; + private int rowCount = 0; + + @Override + public void initialize(Arguments args) { + sizeCol = args.value("size_col"); + timeCol = args.value("time_col"); + outSizeCol = args.value("out_size_col"); + outTimeCol = args.value("out_time_col"); + } + + @Override + public List execute(List rows, ExecutorContext ctx) { + for (Row row : rows) { + String sizeVal = row.getValue(sizeCol).toString(); + String timeVal = row.getValue(timeCol).toString(); + totalBytes += new ByteSize(sizeVal).getBytes(); + totalNanos += new TimeDuration(timeVal).getNanos(); + rowCount++; + } + List output = new ArrayList<>(); + Row result = new Row(); + result.add(outSizeCol, totalBytes / (1024.0 * 1024.0)); // to MB + result.add(outTimeCol, totalNanos / 1_000_000_000.0); // to seconds + output.add(result); + return output; + } + + ... +} diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/AggregateStats.java b/wrangler-core/src/test/java/io/cdap/directives/row/AggregateStats.java new file mode 100644 index 000000000..2bd12bdb4 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/directives/row/AggregateStats.java @@ -0,0 +1,79 @@ +@@ -0,0 +1,78 @@ +package io.cdap.wrangler.parser.directives.row; + +import io.cdap.wrangler.api.Row; +import io.cdap.wrangler.api.parser.Directive; +import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.api.parser.TokenType; +import io.cdap.wrangler.api.parser.TokenGroup; +import io.cdap.wrangler.api.parser.ColumnName; +import io.cdap.wrangler.api.parser.Text; +import io.cdap.wrangler.api.parser.ByteSize; +import io.cdap.wrangler.api.parser.TimeDuration; +import io.cdap.wrangler.api.ExecutorContext; + +import java.util.List; +import java.util.ArrayList; + +public class AggregateStats implements Directive { + private String byteSizeCol; + private String timeDurationCol; + private String outputSizeCol; + private String outputTimeCol; + + private long totalBytes = 0; + private long totalMillis = 0; + + private boolean isFinalRun = false; + + @Override + public UsageDefinition define() { + return UsageDefinition.builder("aggregate-stats") + .withArgs( + TokenGroup.builder() + .addTokenType(TokenType.COLUMN_NAME) // Byte size column + .addTokenType(TokenType.COLUMN_NAME) // Time duration column + .addTokenType(TokenType.TEXT) // Output byte column + .addTokenType(TokenType.TEXT) // Output time column + .build() + ) + .build(); + } + + @Override + public void initialize(ExecutorContext context, List args) throws Exception { + this.byteSizeCol = ((ColumnName) args.get(0)).value(); + this.timeDurationCol = ((ColumnName) args.get(1)).value(); + this.outputSizeCol = ((Text) args.get(2)).value(); + this.outputTimeCol = ((Text) args.get(3)).value(); + } + + @Override + public List execute(List rows, ExecutorContext context) throws Exception { + List result = new ArrayList<>(); + + for (Row row : rows) { + Object sizeVal = row.getValue(byteSizeCol); + Object timeVal = row.getValue(timeDurationCol); + + if (sizeVal != null && timeVal != null) { + ByteSize size = new ByteSize(sizeVal.toString()); + TimeDuration time = new TimeDuration(timeVal.toString()); + + totalBytes += size.getBytes(); + totalMillis += time.getMilliseconds(); + } + } + + // Final aggregated row + Row output = new Row(); + double mb = totalBytes / (1024.0 * 1024.0); + double sec = totalMillis / 1000.0; + + output.add(outputSizeCol, mb); + output.add(outputTimeCol, sec); + + result.add(output); + return result; + } +}