Skip to content

Window functions

Window functions compute a value for each row using a window of related rows (rows in the same partition, or rows within a range of the current row). They are the same functions that back SQL’s OVER (PARTITION BY ... ORDER BY ...) clause.

import { Window, row_number, rank, dense_rank, lag, lead, ntile, sum, avg } from "@spark-connect-js/node";

Window is a factory that returns a WindowSpec. A spec has a partition, an ordering, and an optional frame.

const w = Window.partitionBy("department").orderBy(col("salary").desc());

partitionBy restricts the window to rows sharing the same partition key (analogous to groupBy for aggregates). orderBy defines the sequence inside each partition. Both are optional (you can build a global window, an unordered partitioned window, etc.) but most window functions require an ordering.

A frame is a range of rows relative to the current row. Frames are only meaningful when the window is ordered.

// Rows: physical offsets from the current row.
Window.partitionBy("dept").orderBy("ts").rowsBetween(
Window.unboundedPreceding,
Window.currentRow,
);
// Range: logical offsets based on the ordering value.
Window.partitionBy("dept").orderBy("salary").rangeBetween(-1000, 1000);
// Symbolic bounds:
Window.unboundedPreceding
Window.unboundedFollowing
Window.currentRow

Call .over(window) on any window-capable column expression.

import { row_number, rank, sum } from "@spark-connect-js/node";
const w = Window.partitionBy("department").orderBy(col("salary").desc());
const ranked = employees.select(
col("name"),
col("department"),
col("salary"),
row_number().over(w).alias("rank"),
rank().over(w).alias("rank_with_ties"),
sum("salary").over(Window.partitionBy("department")).alias("dept_total"),
);
row_number().over(w) // 1, 2, 3, 4: always unique
rank().over(w) // 1, 2, 2, 4: ties share a rank, gaps after
dense_rank().over(w) // 1, 2, 2, 3: ties share, no gaps
percent_rank().over(w) // (rank - 1) / (count - 1)
ntile(4).over(w) // bucket 1..4 based on position
lag(col("value"), 1).over(w) // previous row's value
lag(col("value"), 1, 0).over(w) // with default when there is no previous row
lead(col("value"), 1).over(w) // next row's value

Aggregate functions become windowed when used with .over(...):

const w = Window.partitionBy("series")
.orderBy("ts")
.rowsBetween(-6, 0); // 7-point trailing window
df.withColumn("moving_avg", avg("value").over(w));
df.withColumn("running_total", sum("value").over(
Window.partitionBy("account").orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow),
));
cume_dist().over(w)

Rank employees within each department, then keep only the top three per department:

import { connect, col, row_number, Window } from "@spark-connect-js/node";
const spark = connect("sc://localhost:15002");
const employees = spark.sql(`
SELECT * FROM VALUES
('Alice', 'Engineering', 95000),
('Bob', 'Engineering', 90000),
('Carol', 'Engineering', 85000),
('Dave', 'Engineering', 80000),
('Eve', 'Marketing', 78000),
('Frank', 'Marketing', 72000),
('Grace', 'Marketing', 70000)
AS e(name, department, salary)
`);
const w = Window.partitionBy("department").orderBy(col("salary").desc());
const topThree = employees
.withColumn("rank", row_number().over(w))
.filter(col("rank").lte(3))
.drop("rank");
console.table(await topThree.collect());
await spark.stop();
  • rangeBetween with non-numeric ordering expressions is rejected by Spark’s analyzer (the WindowSpec rule, not a client limitation). Use rowsBetween for arbitrary orderings.