Skip to content

I/O

Reads and writes in Spark go through builder objects: DataFrameReader and DataFrameWriter respectively. You can configure a format and the options it needs, then point the builder at a file path or a catalog table. The reading and writing itself runs on the cluster, not on the client.

spark.read returns a DataFrameReader. Configure it with .format(...), .option(...), .schema(...), then terminate with a format shortcut or .load(...).

// Format shortcuts; each returns a lazy DataFrame.
const parquet = spark.read.parquet("s3://bucket/events/");
const json = spark.read.json("/data/events.json");
const orc = spark.read.orc("/data/events.orc");
const text = spark.read.text("/data/logs.txt");
const csv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/data/events.csv");
// Multiple paths:
spark.read.parquet("/a/", "/b/", "/c/");
// Generic form:
spark.read.format("parquet").option("mergeSchema", "true").load("/data/");

For formats without built-in schema (CSV, JSON, text), providing a schema avoids runtime type surprises:

import { StructType, StructField } from "@spark-connect-js/node";
// As DDL:
spark.read
.schema("id INT, name STRING, age INT, joined TIMESTAMP")
.option("header", "true")
.csv("/data/people.csv");
// As a StructType:
const schema = new StructType([
new StructField("id", "int"),
new StructField("name", "string"),
new StructField("age", "int"),
]);
spark.read.schema(schema).json("/data/people.json");

StructType.toDDL() returns the equivalent DDL string, which is what gets sent over the wire.

If you don’t pass a schema:

  • CSV: every column comes back as string. Set option("inferSchema", "true") to detect numeric and date types from a second pass over the file.
  • JSON: Spark infers types from a sample of the input. The result depends on what it sees.
  • Text: always one column, value: string.
  • Parquet and ORC: schema is read from the file footers; you don’t need to provide one.

Options map one-to-one with Spark’s data-source options. A partial list of the most common ones for the shortcut methods:

FormatOptions
CSVheader, sep, quote, escape, inferSchema, nullValue, dateFormat, mode, multiLine
JSONmultiLine, allowComments, allowSingleQuotes, mode, primitivesAsString
ParquetmergeSchema, compression
ORCmergeSchema, compression
TextlineSep, wholetext, compression

For format-specific options across every Spark-supported source (including Avro, XML, JDBC, Hive), see Spark SQL Data Sources. Anything documented there is reachable via .format("name").option(...).load(path).

spark.read.table("analytics.events");
spark.table("analytics.events"); // shorthand

df.write returns a DataFrameWriter. Configure with .mode(...), .option(...), .partitionBy(...), .sortBy(...), .bucketBy(...), then terminate with a format shortcut or .save(...).

await df.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("/tmp/events/");
await df.write.mode("append").json("/tmp/events-json/");
await df.write.mode("overwrite").option("header", "true").csv("/tmp/events-csv/");
await df.write.orc("/tmp/events-orc/");
await df.write.text("/tmp/logs/"); // requires a single string column named "value"
.mode("overwrite") // replace existing data
.mode("append") // add to existing data
.mode("ignore") // no-op if target exists
.mode("error") // fail if target exists (default)
.mode("errorifexists")
await df.write.mode("overwrite").partitionBy("year", "month").parquet("/tmp/events/");
await df.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("ts")
.saveAsTable("events_bucketed");

partitionBy writes a directory per distinct partition value. bucketBy hash-distributes rows across a fixed number of files and is only available with saveAsTable.

await df.write
.mode("overwrite")
.format("parquet")
.option("path", "/warehouse/events/") // external table; omit for managed
.saveAsTable("analytics.events");
await df.write.insertInto("analytics.events");
await df.write.mode("overwrite").insertInto("analytics.events"); // OVERWRITE

insertInto ignores column names and writes by position, matching Hive / Spark SQL semantics.

For newer table formats (Iceberg, Delta) that expose DataSourceV2 semantics, you may use df.writeTo(table):

await df.writeTo("analytics.events")
.using("iceberg")
.tableProperty("write.format.default", "parquet")
.partitionedBy(col("year"), col("month"))
.createOrReplace();
await df.writeTo("analytics.events").append();
await df.writeTo("analytics.events").overwrite(col("year").eq(lit(2025)));
await df.writeTo("analytics.events").overwritePartitions();

The v2 writer’s createOrReplace is atomic on engines that support it (Iceberg, Delta); on file-based sources it falls back to the v1 semantics.

This mirrors examples/node-read-write:

import { connect, col } from "@spark-connect-js/node";
const spark = connect("sc://localhost:15002");
const tmp = "/tmp/io-example";
const sensors = spark.sql(`
SELECT * FROM VALUES
(1, 'temp', 23.4, '2025-06-01'),
(2, 'humid', 55.1, '2025-06-01'),
(3, 'temp', 24.8, '2025-06-01')
AS readings(id, kind, value, date)
`);
await sensors.write.mode("overwrite").option("header", "true").csv(`${tmp}/csv`);
await sensors.write.mode("overwrite").json(`${tmp}/json`);
await sensors.write.mode("overwrite").parquet(`${tmp}/parquet`);
const roundTrip = await spark.read
.schema("id INT, kind STRING, value DOUBLE, date STRING")
.option("header", "true")
.csv(`${tmp}/csv`)
.sort(col("id").asc())
.collect();
console.table(roundTrip);
await spark.stop();