원클릭으로
beam-concepts
// Explains core Apache Beam programming model concepts including PCollections, PTransforms, Pipelines, and Runners. Use when learning Beam fundamentals or explaining pipeline concepts.
// Explains core Apache Beam programming model concepts including PCollections, PTransforms, Pipelines, and Runners. Use when learning Beam fundamentals or explaining pipeline concepts.
Guides YAML SDK development in Apache Beam, including environment setup, testing, and key concepts. Use when working with Beam YAML code in sdks/python/apache_beam/yaml/.
Guide on how to add and propagate new metadata fields in Apache Beam's WindowedValue, extending protos, windmill persistence, and runner interfaces to avoid metadata loss.
Guides understanding and working with Apache Beam runners (Direct, Dataflow, Flink, Spark, etc.). Use when configuring pipelines for different execution environments or debugging runner-specific issues.
Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer, @OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage. Use this skill when you encounter DoFn methods that use context.element(), context.output(), etc., and need to modernize them using parameter injection (@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
Guides Python SDK development in Apache Beam, including environment setup, testing, building, and running pipelines. Use when working with Python code in sdks/python/.
Guides understanding and using the Gradle build system in Apache Beam. Use when building projects, understanding dependencies, or troubleshooting build issues.
| name | beam-concepts |
| description | Explains core Apache Beam programming model concepts including PCollections, PTransforms, Pipelines, and Runners. Use when learning Beam fundamentals or explaining pipeline concepts. |
Evolved from Google's MapReduce, FlumeJava, and Millwheel projects. Originally called the "Dataflow Model."
A Pipeline encapsulates the entire data processing task, including reading, transforming, and writing data.
// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
.apply(...)
.apply(...);
p.run().waitUntilFinish();
# Python
with beam.Pipeline(options=options) as p:
(p | 'Read' >> beam.io.ReadFromText('input.txt')
| 'Transform' >> beam.Map(process)
| 'Write' >> beam.io.WriteToText('output'))
A distributed dataset that can be bounded (batch) or unbounded (streaming).
A data processing operation that transforms PCollections.
// Java
PCollection<String> output = input.apply(MyTransform.create());
# Python
output = input | 'Name' >> beam.ParDo(MyDoFn())
General-purpose parallel processing.
// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<Integer> out) {
out.output(element.length());
}
}));
# Python
class LengthFn(beam.DoFn):
def process(self, element):
yield len(element)
input | beam.ParDo(LengthFn())
# Or simpler:
input | beam.Map(len)
Groups elements by key.
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());
Joins multiple PCollections by key.
Combines elements (sum, mean, etc.).
// Global combine
input.apply(Combine.globally(Sum.ofIntegers()));
// Per-key combine
input.apply(Combine.perKey(Sum.ofIntegers()));
Merges multiple PCollections.
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());
Splits a PCollection into multiple PCollections.
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));
input | beam.WindowInto(beam.window.FixedWindows(300))
Control when results are emitted.
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.standardHours(1))
.accumulatingFiredPanes());
Additional inputs to ParDo.
PCollectionView<Map<String, String>> sideInput =
lookupTable.apply(View.asMap());
mainInput.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> lookup = c.sideInput(sideInput);
// Use lookup...
}
}).withSideInputs(sideInput));
Configure pipeline execution.
public interface MyOptions extends PipelineOptions {
@Description("Input file")
@Required
String getInput();
void setInput(String value);
}
MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
Strongly-typed access to structured data.
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
public abstract String getName();
public abstract int getAge();
}
PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());
TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};
PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
c.output(failureTag, c.element());
}
}
}).withOutputTags(successTag, TupleTagList.of(failureTag)));
results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());
Use transforms from other SDKs.
# Use Java Kafka connector from Python
from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:9092'},
topics=['my-topic']
)