Skip to content

Aggregate functions

N rows → 1 value

Folds many rows down into a single value per group.

An aggregate collapses many rows into one value per group: SELECT g, demo.vgi_sum(v) ... GROUP BY g. VGI aggregates are built for DuckDB's parallel, partial aggregation model, which is why the interface has four methods rather than one.

The four methods

Implement AggregateFunction<State>:

MethodRole
newState()create a fresh, empty per-group accumulator
update(states, groupIds, batch)fold a batch of rows into the accumulators
combine(target, source)merge two partial accumulators
finalize(result, rowIndex, state)write a group's accumulator as the output value

combine() exists because the engine may aggregate in parallel: several threads (or processes) each build partial States over a slice of the data, then merge them. Your State is Serializable so a partial can cross a process boundary. Keep it small.

java
// VGI-Java example: an aggregate function.
//
// An aggregate collapses many rows into one value per group. VGI aggregates are
// built for DuckDB's *parallel, partial* aggregation model, so you implement
// four pieces:
//
//   newState()  — a fresh, empty accumulator
//   update()    — fold a batch of rows into the per-group accumulators
//   combine()   — merge two partial accumulators (parallel workers / spill)
//   finalize()  — write a group's accumulator out as the result value
//
// The `State` is `Serializable` because partials may cross process boundaries
// when DuckDB parallelizes the aggregation. Keep it small.
//
//   ATTACH 'demo' AS demo (TYPE vgi, LOCATION 'launch:/abs/path/bin/runAggregate');
//   SELECT g, demo.vgi_sum(v) FROM (VALUES (1,10),(1,20),(2,5)) t(g,v) GROUP BY g;
//   -- 1 -> 30, 2 -> 5
package farm.query.vgi.examples;

import farm.query.vgi.Worker;
import farm.query.vgi.aggregate.AggregateFunction;
import farm.query.vgi.function.FunctionSpec;
import farm.query.vgi.types.Schemas;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/** {@code vgi_sum(value BIGINT) -> BIGINT}: sum per group, overflow-checked. */
public final class AggregateExample implements AggregateFunction<AggregateExample.State> {

    /** Per-group accumulator. Serializable: partials may be merged across workers. */
    public static final class State implements Serializable {
        private static final long serialVersionUID = 1L;
        long total;
    }

    private static final Schema OUTPUT_SCHEMA =
            new Schema(List.of(Schemas.nullable("result", Schemas.INT64)));

    private static final FunctionSpec SPEC = FunctionSpec.builder("vgi_sum")
            .description("Sum integer values")
            .arg("value", Schemas.INT64)
            .build();

    @Override public FunctionSpec spec() { return SPEC; }
    @Override public Schema outputSchema() { return OUTPUT_SCHEMA; }
    @Override public State newState() { return new State(); }

    // Fold one input batch into the accumulators. `groupIds[i]` is the group of
    // row i; states.computeIfAbsent mints an accumulator the first time a group
    // is seen in this partition.
    @Override
    public void update(Map<Long, State> states, long[] groupIds, VectorSchemaRoot input) {
        FieldVector v = input.getFieldVectors().get(0);
        if (!(v instanceof BigIntVector b)) return;
        int rows = input.getRowCount();
        try {
            for (int i = 0; i < rows; i++) {
                if (b.isNull(i)) continue;
                State s = states.computeIfAbsent(groupIds[i], k -> new State());
                s.total = Math.addExact(s.total, b.get(i));
            }
        } catch (ArithmeticException e) {
            throw new IllegalArgumentException("vgi_sum: int64 overflow", e);
        }
    }

    // Merge a partial (`source`) produced by another worker into `target`.
    @Override
    public void combine(State target, State source) {
        target.total = Math.addExact(target.total, source.total);
    }

    // Write one group's final value into the output column at `rowIndex`.
    @Override
    public void finalize(FieldVector result, int rowIndex, State state) {
        ((BigIntVector) result).setSafe(rowIndex, state.total);
    }

    public static void main(String[] args) {
        Worker.builder()
                .catalogName("demo")
                .registerAggregate(new AggregateExample())
                .runFromArgs(args);
    }
}
sql
SELECT g, demo.vgi_sum(v)
  FROM (VALUES (1,10),(1,20),(2,5)) t(g,v) GROUP BY g ORDER BY g;   -- 1->30, 2->5

How the pieces fit

Several threads each fold rows into a partial State via update(); combine() merges them into one State; finalize() writes the result value.

  • update is the hot loop. groupIds[i] is row i's group; mint an accumulator with states.computeIfAbsent(gid, k -> new State()). Read the input columns from batch.getFieldVectors().
  • combine must be associative and commutative — it's called in an unspecified order across partials.
  • finalize writes one output row. The output column is the outputSchema() you declared (here a single result BIGINT).

Arguments and output

The FunctionSpec declares the input arguments and outputSchema() the result:

java
private static final FunctionSpec SPEC = FunctionSpec.builder("vgi_sum")
        .description("Sum integer values")
        .arg("value", Schemas.INT64)        // one input column
        .build();

A nullary aggregate (like vgi_count()) declares no arg and counts rows. Override finalizeEmpty(result, rowIndex) to control the empty-group result — COUNT returns 0 there, while SUM returns NULL (the default):

java
@Override public void finalizeEmpty(FieldVector result, int rowIndex) {
    ((BigIntVector) result).setSafe(rowIndex, 0L);   // count of nothing is 0
}

Correctness notes

  • Guard overflow. Math.addExact / Math.multiplyExact turn a silent wrap into a clear vgi_sum: int64 overflow error.
  • Don't stash Arrow vectors in State. The input batch is reused; copy out the scalar values you need. (State is serialized — it must be plain data.)
  • combine may run before or after any update. Never assume an order.

Going further

vgi-example-worker/src/main/java/farm/query/vgi/example/aggregate/ has richer aggregates: Avg (a two-field running state), ListAgg (a growing list state), and Count. They all follow the same four-method shape.

Next: buffering functions →