I/O
Reads and writes in Spark go through builder objects: DataFrameReader and DataFrameWriter respectively. You can configure a format and the options it needs, then point the builder at a file path or a catalog table. The reading and writing itself runs on the cluster, not on the client.
Reading
Section titled “Reading”spark.read returns a DataFrameReader. Configure it with .format(...), .option(...), .schema(...), then terminate with a format shortcut or .load(...).
// Format shortcuts; each returns a lazy DataFrame.const parquet = spark.read.parquet("s3://bucket/events/");const json = spark.read.json("/data/events.json");const orc = spark.read.orc("/data/events.orc");const text = spark.read.text("/data/logs.txt");
const csv = spark.read .option("header", "true") .option("inferSchema", "true") .csv("/data/events.csv");
// Multiple paths:spark.read.parquet("/a/", "/b/", "/c/");
// Generic form:spark.read.format("parquet").option("mergeSchema", "true").load("/data/");Schemas
Section titled “Schemas”For formats without built-in schema (CSV, JSON, text), providing a schema avoids runtime type surprises:
import { StructType, StructField } from "@spark-connect-js/node";
// As DDL:spark.read .schema("id INT, name STRING, age INT, joined TIMESTAMP") .option("header", "true") .csv("/data/people.csv");
// As a StructType:const schema = new StructType([ new StructField("id", "int"), new StructField("name", "string"), new StructField("age", "int"),]);spark.read.schema(schema).json("/data/people.json");StructType.toDDL() returns the equivalent DDL string, which is what gets sent over the wire.
If you don’t pass a schema:
- CSV: every column comes back as
string. Setoption("inferSchema", "true")to detect numeric and date types from a second pass over the file. - JSON: Spark infers types from a sample of the input. The result depends on what it sees.
- Text: always one column,
value: string. - Parquet and ORC: schema is read from the file footers; you don’t need to provide one.
Common options
Section titled “Common options”Options map one-to-one with Spark’s data-source options. A partial list of the most common ones for the shortcut methods:
| Format | Options |
|---|---|
| CSV | header, sep, quote, escape, inferSchema, nullValue, dateFormat, mode, multiLine |
| JSON | multiLine, allowComments, allowSingleQuotes, mode, primitivesAsString |
| Parquet | mergeSchema, compression |
| ORC | mergeSchema, compression |
| Text | lineSep, wholetext, compression |
For format-specific options across every Spark-supported source (including Avro, XML, JDBC, Hive), see Spark SQL Data Sources. Anything documented there is reachable via .format("name").option(...).load(path).
Reading catalog tables
Section titled “Reading catalog tables”spark.read.table("analytics.events");spark.table("analytics.events"); // shorthandWriting
Section titled “Writing”df.write returns a DataFrameWriter. Configure with .mode(...), .option(...), .partitionBy(...), .sortBy(...), .bucketBy(...), then terminate with a format shortcut or .save(...).
await df.write .mode("overwrite") .option("compression", "snappy") .parquet("/tmp/events/");
await df.write.mode("append").json("/tmp/events-json/");await df.write.mode("overwrite").option("header", "true").csv("/tmp/events-csv/");await df.write.orc("/tmp/events-orc/");await df.write.text("/tmp/logs/"); // requires a single string column named "value"Save modes
Section titled “Save modes”.mode("overwrite") // replace existing data.mode("append") // add to existing data.mode("ignore") // no-op if target exists.mode("error") // fail if target exists (default).mode("errorifexists")Partitioning and bucketing
Section titled “Partitioning and bucketing”await df.write.mode("overwrite").partitionBy("year", "month").parquet("/tmp/events/");
await df.write .mode("overwrite") .bucketBy(16, "user_id") .sortBy("ts") .saveAsTable("events_bucketed");partitionBy writes a directory per distinct partition value. bucketBy hash-distributes rows across a fixed number of files and is only available with saveAsTable.
Saving as a catalog table
Section titled “Saving as a catalog table”await df.write .mode("overwrite") .format("parquet") .option("path", "/warehouse/events/") // external table; omit for managed .saveAsTable("analytics.events");Insert into an existing table
Section titled “Insert into an existing table”await df.write.insertInto("analytics.events");await df.write.mode("overwrite").insertInto("analytics.events"); // OVERWRITEinsertInto ignores column names and writes by position, matching Hive / Spark SQL semantics.
The v2 writer
Section titled “The v2 writer”For newer table formats (Iceberg, Delta) that expose DataSourceV2 semantics, you may use df.writeTo(table):
await df.writeTo("analytics.events") .using("iceberg") .tableProperty("write.format.default", "parquet") .partitionedBy(col("year"), col("month")) .createOrReplace();
await df.writeTo("analytics.events").append();await df.writeTo("analytics.events").overwrite(col("year").eq(lit(2025)));await df.writeTo("analytics.events").overwritePartitions();The v2 writer’s createOrReplace is atomic on engines that support it (Iceberg, Delta); on file-based sources it falls back to the v1 semantics.
A round-trip example
Section titled “A round-trip example”This mirrors examples/node-read-write:
import { connect, col } from "@spark-connect-js/node";
const spark = connect("sc://localhost:15002");const tmp = "/tmp/io-example";
const sensors = spark.sql(` SELECT * FROM VALUES (1, 'temp', 23.4, '2025-06-01'), (2, 'humid', 55.1, '2025-06-01'), (3, 'temp', 24.8, '2025-06-01') AS readings(id, kind, value, date)`);
await sensors.write.mode("overwrite").option("header", "true").csv(`${tmp}/csv`);await sensors.write.mode("overwrite").json(`${tmp}/json`);await sensors.write.mode("overwrite").parquet(`${tmp}/parquet`);
const roundTrip = await spark.read .schema("id INT, kind STRING, value DOUBLE, date STRING") .option("header", "true") .csv(`${tmp}/csv`) .sort(col("id").asc()) .collect();
console.table(roundTrip);await spark.stop();