Window functions
Window functions compute a value for each row using a window of related rows (rows in the same partition, or rows within a range of the current row). They are the same functions that back SQL’s OVER (PARTITION BY ... ORDER BY ...) clause.
import { Window, row_number, rank, dense_rank, lag, lead, ntile, sum, avg } from "@spark-connect-js/node";Building a window
Section titled “Building a window”Window is a factory that returns a WindowSpec. A spec has a partition, an ordering, and an optional frame.
const w = Window.partitionBy("department").orderBy(col("salary").desc());partitionBy restricts the window to rows sharing the same partition key (analogous to groupBy for aggregates). orderBy defines the sequence inside each partition. Both are optional (you can build a global window, an unordered partitioned window, etc.) but most window functions require an ordering.
Frames
Section titled “Frames”A frame is a range of rows relative to the current row. Frames are only meaningful when the window is ordered.
// Rows: physical offsets from the current row.Window.partitionBy("dept").orderBy("ts").rowsBetween( Window.unboundedPreceding, Window.currentRow,);
// Range: logical offsets based on the ordering value.Window.partitionBy("dept").orderBy("salary").rangeBetween(-1000, 1000);
// Symbolic bounds:Window.unboundedPrecedingWindow.unboundedFollowingWindow.currentRowApplying a window
Section titled “Applying a window”Call .over(window) on any window-capable column expression.
import { row_number, rank, sum } from "@spark-connect-js/node";
const w = Window.partitionBy("department").orderBy(col("salary").desc());
const ranked = employees.select( col("name"), col("department"), col("salary"), row_number().over(w).alias("rank"), rank().over(w).alias("rank_with_ties"), sum("salary").over(Window.partitionBy("department")).alias("dept_total"),);Common window functions
Section titled “Common window functions”Ranking
Section titled “Ranking”row_number().over(w) // 1, 2, 3, 4: always uniquerank().over(w) // 1, 2, 2, 4: ties share a rank, gaps afterdense_rank().over(w) // 1, 2, 2, 3: ties share, no gapspercent_rank().over(w) // (rank - 1) / (count - 1)ntile(4).over(w) // bucket 1..4 based on positionLag and lead
Section titled “Lag and lead”lag(col("value"), 1).over(w) // previous row's valuelag(col("value"), 1, 0).over(w) // with default when there is no previous rowlead(col("value"), 1).over(w) // next row's valueMoving aggregates
Section titled “Moving aggregates”Aggregate functions become windowed when used with .over(...):
const w = Window.partitionBy("series") .orderBy("ts") .rowsBetween(-6, 0); // 7-point trailing window
df.withColumn("moving_avg", avg("value").over(w));df.withColumn("running_total", sum("value").over( Window.partitionBy("account").orderBy("ts") .rowsBetween(Window.unboundedPreceding, Window.currentRow),));Cumulative distribution
Section titled “Cumulative distribution”cume_dist().over(w)A realistic example
Section titled “A realistic example”Rank employees within each department, then keep only the top three per department:
import { connect, col, row_number, Window } from "@spark-connect-js/node";
const spark = connect("sc://localhost:15002");
const employees = spark.sql(` SELECT * FROM VALUES ('Alice', 'Engineering', 95000), ('Bob', 'Engineering', 90000), ('Carol', 'Engineering', 85000), ('Dave', 'Engineering', 80000), ('Eve', 'Marketing', 78000), ('Frank', 'Marketing', 72000), ('Grace', 'Marketing', 70000) AS e(name, department, salary)`);
const w = Window.partitionBy("department").orderBy(col("salary").desc());
const topThree = employees .withColumn("rank", row_number().over(w)) .filter(col("rank").lte(3)) .drop("rank");
console.table(await topThree.collect());await spark.stop();Limitations
Section titled “Limitations”rangeBetweenwith non-numeric ordering expressions is rejected by Spark’s analyzer (theWindowSpecrule, not a client limitation). UserowsBetweenfor arbitrary orderings.