Skip to content

SQL and DataFrame guide

Spark has two ways to express a query: SQL strings and the DataFrame API. Both compile to the same logical plan in Catalyst, so mixing them in one pipeline is fine.

A DataFrame comes from an SQL query, a table name, a numeric range, or a file read. All of these are lazy. The server is untouched until an action runs.

const a = spark.sql("SELECT * FROM my_table WHERE x > 10");
const b = spark.table("my_table");
const c = spark.range(0, 1_000_000);
const d = spark.read.parquet("s3://bucket/events/");

DataFrames are immutable. Each transformation returns a new DataFrame with an extended plan:

const young = employees.filter(col("age").lt(lit(30)));
const named = young.select("name", "age");
// Neither `young` nor `named` has touched the server yet.
const rows = await named.collect();
// This is the call that sends the plan and returns rows.

The server receives the full plan on each action. Catalyst’s analyzer and optimizer run there, and results come back as Arrow IPC batches. The optimizer pushes predicates down, so placing filter before or after select usually produces the same physical plan.

df.select("name", "age"); // by name
df.select(col("name"), col("age")); // by Column
df.selectExpr("name", "age * 12 AS age_in_months");
df.withColumn("full_name", concat(col("first"), lit(" "), col("last")));
df.withColumns({ a: col("x"), b: col("y") });
df.drop("internal_id");
df.withColumnRenamed("age", "years");
df.toDF("a", "b", "c"); // positional rename of all columns

filter and where are aliases.

df.filter(col("age").gt(lit(18)));
df.filter(col("name").rlike("^A"));
df.where(col("age").between(lit(18), lit(65)));
df.filter(col("country").isin("US", "CA", "UK"));
import { count, sum, avg, max } from "@spark-connect-js/node";
df.groupBy("department")
.agg(
count("*").alias("headcount"),
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary"),
);
// Shorthand methods on GroupedData:
df.groupBy("department").count();
df.groupBy("department").sum("salary");
// Multi-dimensional aggregation:
df.cube("department", "role").count();
df.rollup("year", "month").agg(sum("revenue"));
df.sort(col("salary").desc());
df.orderBy(col("dept").asc(), col("salary").desc_nulls_last());
df.limit(100);
df.offset(200).limit(50); // pagination
df.sortWithinPartitions("id");
df1.join(df2, col("df1.id").eq(col("df2.id"))); // arbitrary condition
df1.join(df2, condition, "left_outer"); // inner | left_outer | right_outer | full_outer | left_semi | left_anti | cross
df1.join(df2, undefined, "cross");
df1.crossJoin(df2);

join takes a Column condition, not a column-name string or an array of names. Build equi-joins with col(...).eq(col(...)).

df1.union(df2);
df1.unionByName(df2, true); // allowMissingColumns
df1.intersect(df2);
df1.except(df2); // rows in df1 not in df2
df.distinct();
df.dropDuplicates("user_id", "event_date");
df.sample(0.1); // 10% without replacement
df.sample(0.1, false, 42); // with seed
df.randomSplit([0.8, 0.2], 42); // returns [DataFrame, DataFrame]
df.dropna(); // drop rows with any null
df.dropna("all"); // drop rows where every column is null
df.dropna("any", ["name", "email"]); // scoped to specific columns
df.fillna(0); // fill nulls with 0 across all numeric columns
df.fillna("unknown", ["name"]); // scoped to specific columns
df.replace({ "N/A": null, "": null }); // value replacement (map form)
df.replace({ "N/A": null }, ["country"]); // scoped to specific columns

fillna takes a single scalar and an optional column subset. replace takes a Record<string, scalar | null> mapping old values to new.

df.groupBy("year").pivot("quarter").sum("revenue");
df.groupBy("year").pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]).sum("revenue");
df.unpivot(
["id"], // keep
["jan", "feb", "mar"], // unpivot
"month",
"value",
);

Partitioning and caching are transformations as far as the client is concerned: they modify the plan and take effect on the next action.

df.repartition(200);
df.repartition(200, "user_id");
df.coalesce(10);
df.repartitionByRange(50, col("timestamp"));
await df.cache(); // MEMORY_AND_DISK
await df.persist(MEMORY_ONLY);
await df.unpersist();

cache, persist, and unpersist round-trip to the server and return Promise<DataFrame>. Await them before running a query that depends on the cache being warm.

Actions trigger execution and return a Promise.

await df.collect(); // Row[]
await df.count(); // number
await df.first(); // Row | undefined
await df.head(10); // Row[]
await df.take(5); // alias for head
await df.show(20, false); // print to stdout; returns void. Args: numRows, truncate.
await df.isEmpty(); // boolean
for await (const row of df.toLocalIterator()) {
// Stream rows without buffering the whole result.
}
await df.forEach(row => { ... }); // drains the stream server-side and applies fn client-side

Metadata queries are actions too; they round-trip to the server via AnalyzePlan.

await df.schema(); // StructType
await df.columns(); // string[]
await df.dtypes(); // Array<[string, string]>
await df.printSchema(); // prints to stdout
await df.explain(); // prints physical plan
await df.explain("extended");

df.write returns a DataFrameWriter. df.writeTo(table) returns the v2 writer. See the I/O guide.

col(name) is a reference to a column in the plan. The methods on Column build expression trees that the server evaluates.

import { col, lit } from "@spark-connect-js/node";
// Comparisons
col("age").eq(lit(30));
col("age").gt(lit(30));
col("age").lte(lit(65));
col("name").notEqual(lit("admin"));
// Arithmetic
col("price").multiply(lit(1.08));
col("a").plus(col("b"));
col("total").divide(col("count"));
// Logical
col("a").and(col("b"));
col("status").eq(lit("active")).or(col("priority").gt(lit(5)));
col("deleted").not();
// Null and NaN handling
col("email").isNull();
col("email").isNotNull();
col("score").isNaN();
// Membership and ranges
col("country").isin("US", "CA", "UK");
col("age").between(lit(18), lit(65));
// String matching
col("name").like("A%");
col("email").rlike("^[^@]+@example\\.com$");
col("path").startsWith("/home/");
col("path").endsWith(".log");
col("body").contains("error");
// Casting
col("age").cast("int");
col("amount").cast("decimal(18,2)");
// Ordering
col("salary").asc();
col("salary").desc_nulls_last();
// Aliasing
col("x").plus(col("y")).alias("sum");

lit(value) wraps a JavaScript value as a Column. Useful for any constant in an expression.

df.filter(col("country").eq(lit("US")));
df.withColumn("is_vip", lit(true));
df.withColumn("bucket", col("score").divide(lit(10)).cast("int"));

TypeScript has no operator overloading, so comparisons and arithmetic are methods: col("x").gt(lit(30)), col("a").plus(col("b")). Literals must go through lit(...); passing a raw JS value where a Column is expected is a type error.

For cases where the DataFrame API is awkward, fall back to SQL:

df.selectExpr("age * 365.25 AS age_in_days", "upper(name) AS upper_name");
import { expr } from "@spark-connect-js/node";
df.filter(expr("age > 18 AND country IN ('US', 'CA')"));
df.withColumn("age_group", expr("CASE WHEN age < 18 THEN 'minor' ELSE 'adult' END"));

filter doesn’t accept raw SQL strings. Wrap a SQL fragment in expr("...") to lift it into a Column, or use selectExpr(...) for SQL projections.

Temp views bridge the two worlds:

await df.createOrReplaceTempView("events");
const hourly = spark.sql(`
SELECT date_trunc('hour', ts) AS hour, count(*) AS n
FROM events
GROUP BY 1
`);
import { when, coalesce, isnull } from "@spark-connect-js/node";
df.withColumn(
"tier",
when(col("spend").gt(lit(1000)), lit("gold"))
.when(col("spend").gt(lit(100)), lit("silver"))
.otherwise(lit("bronze")),
);
df.withColumn("display_name", coalesce(col("full_name"), col("email"), lit("anonymous")));

Column and table names are identifiers, not strings; they follow SQL identifier rules. If a name contains special characters or clashes with a reserved word, backtick-quote it inside the string:

df.select("`order-id`", "`from`");