Skip to content

Architecture

Spark Connect is the protocol that lets clients drive a remote Spark cluster without running a JVM in-process. This page traces what happens when a DataFrame call in your TypeScript code becomes plain objects in your hands.

graph LR
core["@spark-connect-js/core<br/>logical plan + expression DSL<br/>zero runtime deps"]
node["@spark-connect-js/node<br/>gRPC transport<br/>Arrow decoder"]
connect["@spark-connect-js/connect<br/>generated protobuf types<br/>(internal)"]
user["Your application"]

user --> node
node --> core
node --> connect
Packages and their role

@spark-connect-js/core is the logical plan, the column expression tree, and the public type surface. Zero runtime dependencies, no I/O.

@spark-connect-js/node adds @grpc/grpc-js for the transport and apache-arrow for decoding result batches, and re-exports the public API from core. This is the package application code installs.

@spark-connect-js/connect holds the generated protobuf types. Internal, pulled in transitively.

sequenceDiagram
participant App as Your code
participant Core as spark-core<br/>(DataFrame API)
participant Node as spark-node<br/>(Transport + Arrow)
participant Server as Spark Connect<br/>server

App->>Core: df = read().filter().select()
Note over Core: Builds LogicalPlan tree.<br/>No I/O yet.
App->>Core: await df.collect()
Core->>Node: execute(LogicalPlan)
Node->>Server: ExecutePlan (gRPC, protobuf)
Note over Server: Catalyst analyzes,<br/>optimizes, runs.
Server-->>Node: Arrow IPC batches (stream)
Node-->>Core: Row[] (decoded)
Core-->>App: rows
Flow of a single action

DataFrame methods don’t talk to the server. They build an immutable plan tree (UnresolvedRelation, Filter, Project, Aggregate, Join, …) and return a new DataFrame. An action like collect, show, or count hands that plan to the transport, which encodes it as a spark.connect.Plan protobuf and opens an ExecutePlan server-streaming RPC. Catalyst runs server-side; result batches come back as Arrow IPC and decode into plain JS objects.

The gRPC channel stays open across actions. session.stop() closes it.

Every DataFrame method maps to a node in the Spark Connect proto schema. @spark-connect-js/core represents plans as a discriminated union mirroring those messages:

type LogicalPlan =
| { type: "read"; format: string; path: string; options: Record<string, string>; schema?: string }
| { type: "project"; child: LogicalPlan; expressions: Expression[] }
| { type: "filter"; child: LogicalPlan; condition: Expression }
| { type: "aggregate"; child: LogicalPlan; groupingExpressions: Expression[]; aggregateExpressions: Expression[] }
| { type: "join"; left: LogicalPlan; right: LogicalPlan; condition?: Expression; joinType: "inner" | "left_outer" | ... }
| ...

Plans are plain data, so they serialize cleanly, compare structurally via df.sameSemantics(other), and hash deterministically via df.semanticHash().

col(...), lit(...), and the exported functions produce or transform a Column carrying an Expression tree:

type Expression =
| { type: "unresolvedAttribute"; name: string }
| { type: "literal"; value: string | number | boolean | bigint | null }
| { type: "unresolvedFunction"; name: string; arguments: Expression[]; isDistinct?: boolean }
| { type: "gt"; left: Expression; right: Expression }
| { type: "and"; left: Expression; right: Expression }
| ...

Expressions stay unresolved on the client. The server’s analyzer binds unresolvedAttribute against the upstream plan’s schema and unresolvedFunction against the function registry, so server-side schema changes don’t invalidate expressions you’ve already built.

Result batches travel as Apache Arrow IPC, wrapped in spark.connect.ExecutePlanResponse.arrow_batch messages. apache-arrow decodes them into plain objects. For large results, use toLocalIterator() to walk batches instead of buffering.

Not every Arrow type round-trips cleanly into JavaScript. A Row from collect() is typed as Record<string, unknown> because the right TypeScript type for some columns (notably bigint versus number) depends on the value range.

Spark / Arrow typeTypeScriptNotes
BOOLEANboolean
BYTE, SHORT, INTnumber32-bit integers fit in a JS number without loss.
LONG (int64)bigintJS number cannot represent the full int64 range. Use BigInt arithmetic or cast to string.
FLOAT, DOUBLEnumberIEEE-754 round-trip. NaN and Infinity are preserved.
DECIMAL(p, s)stringReturned as text to avoid precision loss. Parse with a decimal library if you need arithmetic.
STRINGstringUTF-8 throughout.
BINARYUint8Array
DATEDateTime component is midnight UTC.
TIMESTAMPDateSpark stores microseconds; Date is millisecond-resolution. Sub-millisecond precision is truncated.
TIMESTAMP_NTZDateNo timezone offset applied; the wall-clock value is preserved.
ARRAY<T>T[]Recursively mapped.
MAP<K, V>Record<string, V>Keys are stringified.
STRUCT<...>Record<string, unknown>Field names preserved.
NULLnull

If you need exact microsecond timestamps or full-precision decimals, cast to string in SQL (CAST(ts AS STRING), CAST(amount AS STRING)) until the client grows dedicated representations.

Almost all session state lives on the JVM. Temp views, cached tables, runtime config, Hive metastore bindings: server-side.

The client itself keeps very little:

  • the session UUID,
  • the gRPC channel,
  • the Arrow decoder,
  • connection parameters, for reconnect.

A transient gRPC drop during a server-streaming ExecutePlan is handled in-process: the transport tracks responseId on every received response and, on a retryable failure (UNAVAILABLE, or INTERNAL carrying INVALID_CURSOR.DISCONNECTED), opens a ReattachExecute(operation_id, last_response_id) to resume the stream from after the last acknowledged batch. The retry budget and backoff are configurable via RetryPolicy.

A whole-session loss (driver restart, idle reap) is recoverable too, by constructing a new session with the same session_id while the server’s session cache still has the entry.

A session exists once connect(...) or SparkSession.builder().build() returns. The gRPC channel opens lazily on the first RPC and is shared across actions. session.stop() closes the channel and releases server-side state; once stopped, the session can’t be reused.

For long-running processes, put stop() in a try/finally and call it from your SIGINT and SIGTERM handlers. Otherwise the server waits for the idle timeout to reap the session.

Multiple sessions against the same server work fine. They don’t share temp views or cached tables. Rotating credentials means constructing a new session with the new token and draining the old one (see Security).

What you want to followStart at
DataFrame method to plan nodepackages/spark-core/src/data-frame.ts
The plan shapepackages/spark-core/src/plan/logical-plan.ts
Column expressionspackages/spark-core/src/column.ts
gRPC and Arrowpackages/spark-node/src
Error mappingpackages/spark-core/src/errors.ts