SQL and DataFrame guide
Spark has two ways to express a query: SQL strings and the DataFrame API. Both compile to the same logical plan in Catalyst, so mixing them in one pipeline is fine.
Creating a DataFrame
Section titled “Creating a DataFrame”A DataFrame comes from an SQL query, a table name, a numeric range, or a file read. All of these are lazy. The server is untouched until an action runs.
const a = spark.sql("SELECT * FROM my_table WHERE x > 10");const b = spark.table("my_table");const c = spark.range(0, 1_000_000);const d = spark.read.parquet("s3://bucket/events/");Lazy evaluation
Section titled “Lazy evaluation”DataFrames are immutable. Each transformation returns a new DataFrame with an extended plan:
const young = employees.filter(col("age").lt(lit(30)));const named = young.select("name", "age");// Neither `young` nor `named` has touched the server yet.
const rows = await named.collect();// This is the call that sends the plan and returns rows.The server receives the full plan on each action. Catalyst’s analyzer and optimizer run there, and results come back as Arrow IPC batches. The optimizer pushes predicates down, so placing filter before or after select usually produces the same physical plan.
Transformations
Section titled “Transformations”Projection
Section titled “Projection”df.select("name", "age"); // by namedf.select(col("name"), col("age")); // by Columndf.selectExpr("name", "age * 12 AS age_in_months");df.withColumn("full_name", concat(col("first"), lit(" "), col("last")));df.withColumns({ a: col("x"), b: col("y") });df.drop("internal_id");df.withColumnRenamed("age", "years");df.toDF("a", "b", "c"); // positional rename of all columnsFiltering
Section titled “Filtering”filter and where are aliases.
df.filter(col("age").gt(lit(18)));df.filter(col("name").rlike("^A"));df.where(col("age").between(lit(18), lit(65)));df.filter(col("country").isin("US", "CA", "UK"));Grouping and aggregation
Section titled “Grouping and aggregation”import { count, sum, avg, max } from "@spark-connect-js/node";
df.groupBy("department") .agg( count("*").alias("headcount"), avg("salary").alias("avg_salary"), max("salary").alias("max_salary"), );
// Shorthand methods on GroupedData:df.groupBy("department").count();df.groupBy("department").sum("salary");
// Multi-dimensional aggregation:df.cube("department", "role").count();df.rollup("year", "month").agg(sum("revenue"));Sorting and limits
Section titled “Sorting and limits”df.sort(col("salary").desc());df.orderBy(col("dept").asc(), col("salary").desc_nulls_last());df.limit(100);df.offset(200).limit(50); // paginationdf.sortWithinPartitions("id");df1.join(df2, col("df1.id").eq(col("df2.id"))); // arbitrary conditiondf1.join(df2, condition, "left_outer"); // inner | left_outer | right_outer | full_outer | left_semi | left_anti | crossdf1.join(df2, undefined, "cross");df1.crossJoin(df2);join takes a Column condition, not a column-name string or an array of names. Build equi-joins with col(...).eq(col(...)).
Set operations
Section titled “Set operations”df1.union(df2);df1.unionByName(df2, true); // allowMissingColumnsdf1.intersect(df2);df1.except(df2); // rows in df1 not in df2Deduplication and sampling
Section titled “Deduplication and sampling”df.distinct();df.dropDuplicates("user_id", "event_date");df.sample(0.1); // 10% without replacementdf.sample(0.1, false, 42); // with seeddf.randomSplit([0.8, 0.2], 42); // returns [DataFrame, DataFrame]Missing values
Section titled “Missing values”df.dropna(); // drop rows with any nulldf.dropna("all"); // drop rows where every column is nulldf.dropna("any", ["name", "email"]); // scoped to specific columnsdf.fillna(0); // fill nulls with 0 across all numeric columnsdf.fillna("unknown", ["name"]); // scoped to specific columnsdf.replace({ "N/A": null, "": null }); // value replacement (map form)df.replace({ "N/A": null }, ["country"]); // scoped to specific columnsfillna takes a single scalar and an optional column subset. replace takes a Record<string, scalar | null> mapping old values to new.
Reshaping
Section titled “Reshaping”df.groupBy("year").pivot("quarter").sum("revenue");df.groupBy("year").pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]).sum("revenue");
df.unpivot( ["id"], // keep ["jan", "feb", "mar"], // unpivot "month", "value",);Partitioning and caching
Section titled “Partitioning and caching”Partitioning and caching are transformations as far as the client is concerned: they modify the plan and take effect on the next action.
df.repartition(200);df.repartition(200, "user_id");df.coalesce(10);df.repartitionByRange(50, col("timestamp"));
await df.cache(); // MEMORY_AND_DISKawait df.persist(MEMORY_ONLY);await df.unpersist();cache, persist, and unpersist round-trip to the server and return Promise<DataFrame>. Await them before running a query that depends on the cache being warm.
Actions
Section titled “Actions”Actions trigger execution and return a Promise.
await df.collect(); // Row[]await df.count(); // numberawait df.first(); // Row | undefinedawait df.head(10); // Row[]await df.take(5); // alias for headawait df.show(20, false); // print to stdout; returns void. Args: numRows, truncate.await df.isEmpty(); // boolean
for await (const row of df.toLocalIterator()) { // Stream rows without buffering the whole result.}
await df.forEach(row => { ... }); // drains the stream server-side and applies fn client-sideSchema and metadata
Section titled “Schema and metadata”Metadata queries are actions too; they round-trip to the server via AnalyzePlan.
await df.schema(); // StructTypeawait df.columns(); // string[]await df.dtypes(); // Array<[string, string]>await df.printSchema(); // prints to stdoutawait df.explain(); // prints physical planawait df.explain("extended");Writing
Section titled “Writing”df.write returns a DataFrameWriter. df.writeTo(table) returns the v2 writer. See the I/O guide.
The Column DSL
Section titled “The Column DSL”col(name) is a reference to a column in the plan. The methods on Column build expression trees that the server evaluates.
import { col, lit } from "@spark-connect-js/node";
// Comparisonscol("age").eq(lit(30));col("age").gt(lit(30));col("age").lte(lit(65));col("name").notEqual(lit("admin"));
// Arithmeticcol("price").multiply(lit(1.08));col("a").plus(col("b"));col("total").divide(col("count"));
// Logicalcol("a").and(col("b"));col("status").eq(lit("active")).or(col("priority").gt(lit(5)));col("deleted").not();
// Null and NaN handlingcol("email").isNull();col("email").isNotNull();col("score").isNaN();
// Membership and rangescol("country").isin("US", "CA", "UK");col("age").between(lit(18), lit(65));
// String matchingcol("name").like("A%");col("email").rlike("^[^@]+@example\\.com$");col("path").startsWith("/home/");col("path").endsWith(".log");col("body").contains("error");
// Castingcol("age").cast("int");col("amount").cast("decimal(18,2)");
// Orderingcol("salary").asc();col("salary").desc_nulls_last();
// Aliasingcol("x").plus(col("y")).alias("sum");Literals
Section titled “Literals”lit(value) wraps a JavaScript value as a Column. Useful for any constant in an expression.
df.filter(col("country").eq(lit("US")));df.withColumn("is_vip", lit(true));df.withColumn("bucket", col("score").divide(lit(10)).cast("int"));TypeScript has no operator overloading, so comparisons and arithmetic are methods: col("x").gt(lit(30)), col("a").plus(col("b")). Literals must go through lit(...); passing a raw JS value where a Column is expected is a type error.
SQL strings and raw expressions
Section titled “SQL strings and raw expressions”For cases where the DataFrame API is awkward, fall back to SQL:
df.selectExpr("age * 365.25 AS age_in_days", "upper(name) AS upper_name");
import { expr } from "@spark-connect-js/node";df.filter(expr("age > 18 AND country IN ('US', 'CA')"));df.withColumn("age_group", expr("CASE WHEN age < 18 THEN 'minor' ELSE 'adult' END"));filter doesn’t accept raw SQL strings. Wrap a SQL fragment in expr("...") to lift it into a Column, or use selectExpr(...) for SQL projections.
Temp views bridge the two worlds:
await df.createOrReplaceTempView("events");const hourly = spark.sql(` SELECT date_trunc('hour', ts) AS hour, count(*) AS n FROM events GROUP BY 1`);Conditional expressions
Section titled “Conditional expressions”import { when, coalesce, isnull } from "@spark-connect-js/node";
df.withColumn( "tier", when(col("spend").gt(lit(1000)), lit("gold")) .when(col("spend").gt(lit(100)), lit("silver")) .otherwise(lit("bronze")),);
df.withColumn("display_name", coalesce(col("full_name"), col("email"), lit("anonymous")));User-defined identifiers
Section titled “User-defined identifiers”Column and table names are identifiers, not strings; they follow SQL identifier rules. If a name contains special characters or clashes with a reserved word, backtick-quote it inside the string:
df.select("`order-id`", "`from`");