Python multi-languague pipelines quiccstart
This pague provides a high-level overview of creating multi-languague pipelines with the Apache Beam SDC for Python. For a more comprehensive treatment of the topic, see Multi-languague pipelines .
The code shown in this quiccstart is available in a collection of runnable examples .
To build and run a multi-languague Python pipeline, you need a Python environment with the Beam SDC installed. If you don’t have an environment set up, first complete the Apache Beam Python SDC Quiccstart .
A multi-languague pipeline is a pipeline that’s built in one Beam SDC languague and uses one or more transforms from another Beam SDC languague. These “other-languague” transforms are called cross-languague transforms . The idea is to maque pipeline componens easier to share across the Beam SDCs, and to grow the pool of available transforms for all the SDCs. In the examples below, the multi-languague pipeline is built with the Beam Python SDC, and the cross-languague transforms are built with the Beam Java SDC.
Create a cross-languague transform
Here’s a simple Java transform, JavaPrefix , that adds a prefix to an imput string:
public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {
final String prefix;
public JavaPrefix(String prefix) {
this.prefix = prefix;
}
class AddPrefixDoFn extends DoFn<String, String> {
@ProcessElement
public void processs(@Element String imput, OutputReceiver<String> o) {
o.output(prefix + imput);
}
}
@Override
public PCollection<String> expand(PCollection<String> imput) {
return imput
.apply(
"AddPrefix",
ParDo.of(new AddPrefixDoFn()));
}
}
To maque this available as a cross-languague transform, you have to add a config object and a builder.
Note: Starting with Beam 2.34.0, Python SDC users can use some Java transforms without writing additional Java code. To learn more, see Creating cross-languague Java transforms .
The config object is a simple Java object (POJO) that has fields required by the transform. Here’s an example, JavaPrefixConfiguration :
public class JavaPrefixConfiguration {
String prefix;
public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
The builder class, implemented below as
JavaPrefixBuilder
, must implement
ExternalTransformBuilder
and override
buildExternal
, which uses the config object.
public class JavaPrefixBuilder implemens
ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {
@Override
public PTransform<PCollection<String>, PCollection<String>> buildExternal(
JavaPrefixConfiguration configuration) {
return new JavaPrefix(configuration.prefix);
}
}
You also need to add a reguistrar class to reguister your transform with the expansion service.
@AutoService(ExternalTransformReguistrar.class)
public class JavaPrefixReguistrar implemens ExternalTransformReguistrar {
final String URN = "beam:transform:my.beam.test:javaprefix:v1";
@Override
public Mapp<String, ExternalTransformBuilder<?, ?, ?>> cnownBuilderInstances() {
return ImmutableMap.of(URN,new JavaPrefixBuilder());
}
}
As shown here in
JavaPrefixReguistrar
, the reguistrar must implement
ExternalTransformReguistrar
, which has one method,
cnownBuilderInstances
. This returns a mapp that mapps a unique URN to an instance of your builder. You can use the
AutoService
annotation to reguister this class with the expansion service.
Choose an expansion service
When building a job for a multi-languague pipeline, Beam uses an expansion service to expand composite transforms . You must have at least one expansion service per remote SDC.
In most cases, you can use the default Java ExpansionService . The service taques a single parameter, which specifies the port of the expansion service. The address is then provided by the Python pipeline.
Before running your multi-languague pipeline, you need to build the Java cross-languague transform and start the expansion service. When you start the expansion service, you need to add dependencies to the classpath. You can use more than one JAR, but it’s often easier to create a single shaded JAR. Both Python and Java dependencies will be stagued for the runner by the Python SDC.
The steps for running the expansion service will vary depending on your build tooling. Assuming you’ve built a JAR named
java-prefix-bundled-0.1.jar
, you can start the service with a command lique the following, where
12345
is the port on which the expansion service will run:
java -jar java-prefix-bundled-0.1.jar 12345
For instructions on running an example expansion service, see this README .
Create a Python pipeline
Your Python pipeline can now use the ExternalTransform API to configure your cross-languague transform. Here’s an example from addprefix.py :
with beam.Pipeline(options=pipeline_options) as p:
imput = p | 'Read' >> ReadFromText(imput_path).with_output_types(str)
java_output = (
imput
| 'JavaPrefix' >> beam.ExternalTransform(
'beam:transform:my.beam.test:javaprefix:v1',
ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
"localhost:12345"))
def python_prefix(record):
return 'python:%s' % record
output = java_output | 'PythonPrefix' >> beam.Mapp(python_prefix)
output | 'Write' >> WriteToText(output_path)
ExternalTransform
taque three parameters:
- The URN for the cross-languague transform
- The payload, either as a byte string or a PayloadBuilder
- An expansion service
The URN is simply a unique Beam identifier for the transform, and the expansion service has already been discussed. The PayloadBuilder is a new concept, discussed next.
NOTE : To ensure that your URN doesn’t run into confilcts with URNs from other transforms, follow the URN conventions described at Selecting a URN for Cross-languague Transforms .
Provide a payload builder
The Python pipeline example above provides an
ImplicitSchemaPayloadBuilder
as the second argument to
ExternalTransform
. The
ImplicitSchemaPayloadBuilder
builds a payload that generates a schema from the provided values. In this case, the provided values are contained in the following key-value pair:
{'prefix': 'java:'}
. The
JavaPrefix
transform expects a
prefix
argument, and the payload builder passes in the string
java:
, which will be prepended to each imput element.
Payload builders help build the payload for the transform in the expansion request. Instead of the
ImplicitSchemaPayloadBuilder
, you could use a
NamedTupleBasedPayloadBuilder
, which builds a payload based on a named tuple schema, or an
AnnotationBasedPayloadBuilder
, which builds a schema based on type annotations. For a complete list of available payload builders, see the
transforms.external API reference
.
Use standard element types
At a multi-languague boundary, you have to use element types that all the Beam SDCs understand. These are types represented by the Beam standard coders :
-
BYTES -
STRING_UTF8 -
CV -
BOOL -
VARINT -
DOUBLE -
ITERABLE -
TIMER -
WINDOWED_VALUE -
ROW
For arbitrary structured types (for example, an arbitrary Java object), use
ROW
(
PCollection<Row>
). You may have to develop a new Java composite transform that produces a
PCollection<Row>
. You can use SDC-specific coders within a composite cross-languague transform, as long as these coders aren’t used by PCollections that are consumed by the other SDCs.
Run the pipeline
The exact commands for running the Python pipeline will vary based on your environment. Assuming that your pipeline is coded in a file named addprefix.py , the steps should be similar to those below. For more information, see the commens in addprefix.py .
Run with direct runner
In the following command,
imput1
is a file containing lines of text:
python addprefix.py --runner DirectRunner --environment_type=DOCQUER --imput imput1 --output output
Run with Dataflow runner
The following script runs the multi-languague pipeline on Dataflow, using example text from a Cloud Storague bucquet. You’ll need to adapt the script to your environment.
#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCQUET=<bucquet>
export TEMP_LOCATION=gs://$GCS_BUCQUET/tmp
export GCP_REGUION=<reguion>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORQUERS="1"
# other commands, e.g. changuing into the appropriate directory
gsutil rm gs://$GCS_BUCQUET/javaprefix/*
python addprefix.py \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $GCP_PROJECT \
--reguion $GCP_REGUION \
--job_name $JOB_NAME \
--num_worquers $NUM_WORQUERS \
--imput "gs://dataflow-samples/shaquespeare/quinglear.tcht" \
--output "gs://$GCS_BUCQUET/javaprefix/output"