Create Pipelines using your own Apache Flink jobs

A pipeline is a set of data processing instructions that are written in SQL or expressed as an Apache Flink job. Pipelines that are created from an Flink job are called custom pipelines. When you create a custom pipeline, you are writing how you want your data to be processed in a JVM-based programming language of your choosing, such as Java or Scala. For example, you can create a pipeline that enriches incoming data by invoking an API from within the pipeline and sends that enriched data to a destination of your choosing.

If you are a developer with a use case where SQL is too inflexible or if you have an existing Flink workload that you would like to migrate and use in Decodable, then create a custom pipeline. Once created, you can upload the pipeline to Decodable as a JAR file where it can be managed alongside any SQL-based pipelines that you have.

This feature is currently available as a Tech Preview. If you would like early access to this feature, contact us or request access in Decodable Web on the Pipelines page.
Role-Based Access Control is not currently supported for custom pipelines. To access Decodable streams in a custom pipeline, use the Decodable Pipeline SDK as shown in this example.
  • 1.18 (Java 11)

  • 1.16 (Java 11)

Create and upload a custom pipeline

Perform the following steps to create an Flink job and upload it as a custom pipeline in Decodable.

Prerequisites

Before creating an Flink job, confirm that you have the following installed on your machine:

  • Apache Maven

  • Flink

  • IntelliJ or any other IDE of your choosing

    • We recommend using IntelliJ since it supports Maven projects out-of-the-box.

    • In order for applications to run within IntelliJ, double check that the Include dependencies with "Provided" scope setting has been enabled in the run configuration.

This image shows where to find the Include dependencies with Provided scope setting in IntelliJ. align="left"
  • Java 8 or 11

Steps

Perform the following steps to create an Flink job that can be uploaded as a custom Decodable pipeline.

  1. Use Apache Maven to initialize a new Java application.

    $ mvn archetype:generate \
        -DarchetypeGroupId=org.apache.flink \
        -DarchetypeArtifactId=flink-quickstart-java \
        -DarchetypeVersion=1.18.1 \
        -DgroupId=org.example \
        -DartifactId=decodable-custom-pipeline \
        -Dversion=0.1 \
        -Dpackage=quickstart \
        -DinteractiveMode=false
  2. Navigate to the decodable-custom-pipeline directory. There you will find a pom.xml file with dependency definitions and a src/ directory.

  3. Import the decodable-custom-pipeline directory into IntelliJ or an IDE of your choosing.

  4. Start developing your Flink job using the Flink API

    1. Optionally, use the Decodable Pipeline SDK for accessing Decodable from within your job.

  5. When you are finished developing your Flink job, package the job into a JAR file in order to upload it to Decodable.

    $ pwd
    /private/tmp/decodable-custom-pipeline
    $ mvn clean package
  6. Make sure the target/ directory now contains a file called decodable-custom-pipeline-0.1.jar.

     $ ls target
     classes                                    maven-archiver
     decodable-custom-pipeline-0.1.jar          maven-status
     generated-sources
  7. Upload the JAR file to Decodable.

    • Using Decodable Web:

      • Navigate to the Pipelines page.

      • Select the dropdown icon next to New Pipeline, and then select Upload Custom Pipeline. Follow the prompts on the page to upload the pipeline.

    • Using the Decodable CLI:

      • Run the following command.

        $ decodable pipeline create --name <some name> --job-file target/decodable-custom-pipeline-0.1.jar

        The arguments are defined as follows:
        --name: The name that you want to assign to the pipeline.
        --job-file: The path to the JAR file containing the Flink job that you want to upload.
        --job-arguments: Optional job arguments for the custom pipeline.
        --entry-class: Optional entry class of the custom pipeline. If not provided, the entry class must be specified in the file META-INF/MANIFEST.MF in the pipeline’s JAR file, using the 'Main-Class' property key.

The uploaded custom pipeline now appears on the Pipelines page, and you can activate them to start processing data!

Decodable Pipeline SDK

The Decodable Pipeline SDK is provided for accessing Decodable from within your custom pipeline job. More information and examples can be found in the GitHub repository.

The JavaDocs for the SDK is available here.

Only Flink 1.16 is currently supported by the SDK.

Monitor a custom pipeline

Once you’ve activated your custom pipeline, you’ll see some basic information about the pipeline.

Select the Flink Web Dashboard button to open the Flink UI in a new window. You can use the Flink UI to view health and performance metrics, such as checkpoint monitoring, backpressure monitoring and general throughput information.

The Overview page for a custom pipeline. The button to open the Flink Web Dashboard appears on Decodable Web. width="900"

Update a custom pipeline

Every time you start a custom pipeline, the latest JAR file for that pipeline is used. To update an existing custom pipeline, do the following.

  1. Upload a new JAR file for the pipeline you want to update.

  2. Restart the pipeline to pick up the latest JAR file.

Secrets

If your custom pipeline interacts with external systems that require authentication, it is recommended to store the credentials as Decodable Secrets. You can reference the secret’s value from your custom pipeline, and change it without needing to rebuild your custom pipeline.

Before defining your custom pipeline you must first create the secret(s) that you want to use. The user configuring a custom pipeline must have read permissions for all referenced secrets.

When you create the custom pipeline you must specify each secret to use.

  • From the Decodable Web App: use the "Secret" input when configuring a custom pipeline

  • From the Decodable CLI: specify the id(s) of the secrets to make available as a comma-separated list via the --secrets parameter.

Within a custom pipeline implementation, the value of a managed secret can be obtained using the DecodableSecret class of the Decodable Pipeline SDK, as shown in this example:

SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
   .hostname("localhost")
   .port(1433)
   .database("inventory")
   .tableList("dbo.items")
   .username("my-sql-server-user")
   .password(DecodableSecret.withName("my-sql-server-password").value())
   .deserializer(new JsonDebeziumDeserializationSchema())
   .build();

You can also read secrets directly from the file system. The value of a secret is accessible as a text file under the path /opt/pipeline-secrets/<secret name>.

State management

When you stop a pipeline, the state of your pipeline is automatically saved and backed up using a Flink savepoint or checkpoint. When you start the pipeline again, we’ll use the state to determine where to resume data processing.

If you want to start a custom pipeline without restoring its state, use the --force flag in the CLI or the Discard State option in the UI.

Logs

For debugging and analysis purposes, you can retrieve the logs of a running custom pipeline. To do so, either use the Flink Web Dashboard, or the Decodable CLI client.

Navigate to the custom pipeline whose logs you’d like to inspect and click on "Flink Web Dashboard". To retrieve the Flink job manager logs, choose "Job Manager" in the menu on the left-hand side, then go to the "Logs" tab. For task manager logs, choose "Task Manager", select one of the task managers, then go to the "Logs" tab.

Decodable CLI

Alternatively, job logs can be retrieved using the decodable pipeline logs <id> CLI command.

Options

-t or --task-index flag can be used to specify an index value for which job task to retrieve logs from. The default index is 0, corresponding to the JobManager. TaskManagers are available at index 1 (and above if there are multiple tasks).

The -n or --lines flag can be used to specify the number of log lines to return.

The -f or --follow flag can be used to stream the logs until cancelled.

Metrics

Custom pipelines expose a set of Flink metrics by default. You can find these in your account’s _metrics stream.

If you’d like to expose custom metrics not shown below, you can do so by including a metric group named DecodableMetrics. Any metrics tagged this way will also be included in your account’s _metrics stream. Please see the Decodable Pipeline SDK for an example of how to do this.

Default Flink Metrics

flink_jobmanager_Status_JVM_CPU_Load

flink_taskmanager_Status_JVM_CPU_Load

flink_jobmanager_Status_JVM_Memory_Heap_Used

flink_taskmanager_Status_JVM_Memory_Heap_Used

flink_jobmanager_Status_JVM_Memory_Heap_Committed

flink_taskmanager_Status_JVM_Memory_Heap_Committed

flink_jobmanager_Status_JVM_Memory_Heap_Max

flink_taskmanager_Status_JVM_Memory_Heap_Max

flink_jobmanager_Status_JVM_Memory_NonHeap_Used

flink_taskmanager_Status_JVM_Memory_NonHeap_Used

flink_jobmanager_Status_JVM_Memory_NonHeap_Committed

flink_taskmanager_Status_JVM_Memory_NonHeap_Committed

flink_jobmanager_Status_JVM_Memory_NonHeap_Max

flink_taskmanager_Status_JVM_Memory_NonHeap_Max

flink_jobmanager_Status_JVM_Memory_Metaspace_Used

flink_taskmanager_Status_JVM_Memory_Metaspace_Used

flink_jobmanager_Status_JVM_Memory_Metaspace_Committed

flink_taskmanager_Status_JVM_Memory_Metaspace_Committed

flink_jobmanager_Status_JVM_Memory_Metaspace_Max

flink_taskmanager_Status_JVM_Memory_Metaspace_Max

flink_jobmanager_Status_JVM_Memory_Direct_Count

flink_taskmanager_Status_JVM_Memory_Direct_Count

flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed

flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed

flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity

flink_taskmanager_Status_JVM_Memory_Direct_TotalCapacity

flink_jobmanager_Status_JVM_Memory_Mapped_Count

flink_taskmanager_Status_JVM_Memory_Mapped_Count

flink_jobmanager_Status_JVM_Memory_Mapped_MemoryUsed

flink_taskmanager_Status_JVM_Memory_Mapped_MemoryUsed

flink_jobmanager_Status_JVM_Memory_Mapped_TotalCapacity

flink_taskmanager_Status_JVM_Memory_Mapped_TotalCapacity

flink_jobmanager_Status_JVM_Threads_Count

flink_taskmanager_Status_JVM_Threads_Count

flink_jobmanager_job_lastCheckpointDuration

flink_jobmanager_job_lastCheckpointSize

flink_jobmanager_job_lastCheckpointFullSize

flink_taskmanager_job_task_backPressuredTimeMsPerSecond

flink_taskmanager_job_task_numRecordsIn

flink_taskmanager_job_task_numRecordsInPerSecond

flink_taskmanager_job_task_numBytesIn

flink_taskmanager_job_task_numBytesInPerSecond

flink_taskmanager_job_task_operator_numRecordsIn

flink_taskmanager_job_task_operator_numRecordsInErrors

flink_taskmanager_job_task_operator_numRecordsInPerSecond

flink_taskmanager_job_task_operator_numBytesIn

flink_taskmanager_job_task_operator_numBytesInPerSecond

flink_taskmanager_job_task_numRecordsOut

flink_taskmanager_job_task_numRecordsOutPerSecond

flink_taskmanager_job_task_numBytesOut

flink_taskmanager_job_task_numBytesOutPerSecond

flink_taskmanager_job_task_operator_numRecordsOut

flink_taskmanager_job_task_operator_numRecordsOutErrors

flink_taskmanager_job_task_operator_numRecordsOutPerSecond

flink_taskmanager_job_task_operator_numBytesOut

flink_taskmanager_job_task_operator_numBytesOutPerSecond

flink_jobmanager_job_numRestarts

flink_jobmanager_job_numberOfCompletedCheckpoints

flink_jobmanager_job_numberOfFailedCheckpoints

flink_jobmanager_job_numberOfInProgressCheckpoints

flink_jobmanager_job_totalNumberOfCheckpoints

flink_jobmanager_taskSlotsTotal

flink_jobmanager_numRegisteredTaskManagers