| name | runners |
| description | Guides understanding and working with Apache Beam runners (Direct, Dataflow, Flink, Spark, etc.). Use when configuring pipelines for different execution environments or debugging runner-specific issues. |
Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
| Runner | Location | Description |
|---|
| Direct | runners/direct-java/ | Local execution for testing |
| Prism | runners/prism/ | Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ | Google Cloud Dataflow |
| Flink | runners/flink/ | Apache Flink |
| Spark | runners/spark/ | Apache Spark |
| Jet | runners/jet/ | Hazelcast Jet |
| Twister2 | runners/twister2/ | Twister2 |
Direct Runner
For local development and testing.
Java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
Python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
Command Line
--runner=DirectRunner
Dataflow Runner
Prerequisites
- GCP project with Dataflow API enabled
- Service account with Dataflow Admin role
- GCS bucket for staging
Java Usage
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
Python Usage
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])
Runner v2
--experiments=use_runner_v2
Custom SDK Container
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
Flink Runner
Embedded Mode
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
Cluster Mode
options.setFlinkMaster("host:port");
Portable Mode (Python)
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK'
])
Spark Runner
Java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark:
Python (Portable)
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])
Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
./gradlew :runners:direct-java:validatesRunner
./gradlew :runners:flink:1.18:validatesRunner
./gradlew :runners:spark:3:validatesRunner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
TestPipeline with Runners
@Rule public TestPipeline pipeline = TestPipeline.create();
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
Portable Runners
Concept
- SDK-independent execution via Fn API
- SDK runs in container, communicates via gRPC
Environment Types
DOCKER - SDK in Docker container
LOOPBACK - SDK in same process (testing)
EXTERNAL - SDK at specified address
PROCESS - SDK in subprocess
Job Server
Start Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
./gradlew :runners:spark:3:job-server:runShadow
Runner-Specific Options
Dataflow
| Option | Description |
|---|
--project | GCP project |
--region | GCP region |
--tempLocation | GCS temp location |
--stagingLocation | GCS staging |
--numWorkers | Initial workers |
--maxNumWorkers | Max workers |
--workerMachineType | VM type |
Flink
| Option | Description |
|---|
--flinkMaster | Flink master address |
--parallelism | Default parallelism |
--checkpointingInterval | Checkpoint interval |
Spark
| Option | Description |
|---|
--sparkMaster | Spark master URL |
--sparkConf | Additional Spark config |
Building Runner Artifacts
Dataflow Worker Jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
Flink Job Server
./gradlew :runners:flink:1.18:job-server:shadowJar
Spark Job Server
./gradlew :runners:spark:3:job-server:shadowJar
Debugging
Direct Runner
- Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug
- Use
--targetParallelism=1 for deterministic execution
Dataflow
- Check Dataflow UI: console.cloud.google.com/dataflow
- Use
--experiments=upload_graph for graph debugging
- Worker logs in Cloud Logging
Portable Runners
- Enable debug logging on job server
- Check SDK harness logs in worker containers