Create Pipelines using your own Apache Flink jobs A Pipeline is a set of data processing instructions that are written in SQL or expressed as an Apache Flink job. Pipelines that are created from a Flink job are called Custom Pipelines in Decodable. You can implement jobs in JVM-based programming languages such as Java (using Flink’s Table and DataStream APIs), as well as in Python (using the PyFlink API). If you are a developer with a use case where SQL is too inflexible or if you have an existing Flink workload that you would like to migrate and use in Decodable, then create a Custom Pipeline. For example, you can create a Pipeline that enriches incoming data by invoking an external REST API from within the Pipeline and sends that enriched data to a sink of your choosing. Once you’ve created a Custom Pipeline, upload the Pipeline to Decodable as a JAR file (for Java-based jobs) or as a ZIP file (for Python-based jobs) where it can be managed alongside any SQL-based Pipelines that you have. To enable Custom Pipelines on your account contact Decodable support if you would like access to this feature. You can also request access in Decodable Web on the Pipelines page. Using the CLI to create or update Custom Pipelines requires CLI version 1.15.0 or higher. Accessing Streams from Custom Pipelines Streams in Decodable are written to from Pipelines or Connectors. If you are using a Java Custom Pipeline you can access Streams using the Pipeline SDK as shown in this example. Accessing Streams from a Python Custom Pipeline isn’t currently possible. Contact Decodable support if you require this feature. Role-Based Access Control for Streams isn’t currently supported for Custom Pipelines. This means that a Custom Pipeline will be able to connect to all the Streams of your account. Supported Flink versions The following table shows which Flink and programming language versions are currently supported in Decodable. The last column shows the identifier to be used when creating Custom Pipelines via the CLI or API. Language version Flink version CLI identifier Java 11 1.19 1.19-java11 Java 11 1.18 1.18-java11 Java 11 1.16 1.16-java11 Python 3.10 1.19 1.19-python311 Python 3.10 1.18 1.18-python310 Python 3.10 1.16 1.16-python310 Create a Custom Pipeline (Java) Perform the following steps to create a Java-based Flink job and upload it as a Custom Pipeline. Prerequisites Before creating a Flink job, make sure that you have the following installed on your machine: Apache Maven Flink (See Supported Flink versions for picking a version) Java 8 or 11 IntelliJ or any other IDE of your choosing We recommend using IntelliJ since it supports Maven projects out-of-the-box. In order for applications to run within IntelliJ, double check that the Include dependencies with "Provided" scope setting has been enabled in the run configuration. Steps Perform the following steps to create a Flink job that can be uploaded as a Custom Pipeline. Use Apache Maven to initialize a new Java application. $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.18.1 \ -DgroupId=org.example \ -DartifactId=decodable-custom-pipeline \ -Dversion=0.1 \ -Dpackage=quickstart \ -DinteractiveMode=false Navigate to the decodable-custom-pipeline directory. There you will find a pom.xml file with dependency definitions and a src/ directory. Import the decodable-custom-pipeline directory into IntelliJ or an IDE of your choosing. Start developing your Flink job using the Flink API Optionally, use the Decodable Pipeline SDK for accessing Decodable resources from within your job. When you are finished developing your Flink job, package the job into a JAR file in order to upload it to Decodable. $ pwd /private/tmp/decodable-custom-pipeline $ mvn clean package Make sure the target/ directory now contains a file called decodable-custom-pipeline-0.1.jar. $ ls target classes maven-archiver decodable-custom-pipeline-0.1.jar maven-status generated-sources Decodable Pipeline SDK The Decodable Pipeline SDK is provided for accessing Decodable from within your Java-based Custom Pipeline jobs. More information and examples can be found in the GitHub repository. The JavaDocs for the SDK is available here. Only Flink 1.16 is currently supported by the SDK. Create a Custom Pipeline (Python) To implement a Python-based Flink job, use the PyFlink API. PyFlink jobs are uploaded as ZIP files to Decodable. The entry point of your job must be named main.py and be located at the root of the job ZIP file. Once you have created your ZIP file, upload it as a new Custom Pipeline as described below. A complete example of a PyFlink job together with basic build setup for running it on Decodable can be found here. Dependencies To add Python dependencies to the ZIP file declare them in a standard pip requirements.txt file and install them into a directory named python-libs: pip install -r requirements.txt \ --platform manylinux2014_aarch64 \ --only-binary=:all: \ --target=python-libs Make sure to build for the AArch64 architecture as shown above, as Decodable Custom Pipelines are executed on Arm CPUs. Then add this directory alongside your main.py file to the job ZIP file: zip -r pyflink-job.zip main.py python-libs Upload a Custom Pipeline When your JAR or ZIP file is ready, you can upload it to Decodable and use it in a Custom Pipeline using one of the following ways. Using Decodable Web: Navigate to the Pipelines page. Select the dropdown icon next to New Pipeline, and then select Upload Custom Pipeline. Follow the prompts on the page to upload the pipeline. Using the Decodable CLI apply command with a Custom Pipeline YAML definition. Using the Decodable CLI pipeline create command: $ decodable pipeline create --name <some name> \ --job-file target/decodable-custom-pipeline-0.1.jar \ --type JAVA \ --flink-version 1.18-java11 The arguments to this command are defined as follows: Option Required? Description --name Required The name that you want to assign to the Pipeline. --job-file Required The path to the JAR or ZIP file containing the Flink job that you want to upload. --type Required The type of the Custom Pipeline. Can be JAVA or PYTHON. --flink-version Required The Flink version to use (see Supported Flink versions). --job-arguments Optional Job arguments for the Custom Pipeline. --entry-class Optional Entry class for a Java-based Custom Pipeline. If not provided, the entry class must be specified in the file META-INF/MANIFEST.MF in the Pipeline’s JAR file, using the Main-Class property key. --config-file Optional Path to a configuration file that should be uploaded and made available to the Flink job. Repeatable. --secrets Optional List of Secret IDs to be made available to the Custom Pipeline. To be provided as a comma-separated string. Run decodable pipeline create --help to see the full list of available flags. The uploaded Custom Pipeline now appears on the Pipelines page, and you can activate it to start processing data. The maximum allowed job file size is 2GB. Contact Decodable support if you require support for larger job files. Update a Custom Pipeline Every time you start a Custom Pipeline, the latest job file for that Pipeline (JAR or ZIP) is used. To update an existing Custom Pipeline, you can use one of the following ways. Using Decodable Web: Navigate to the Pipelines page, find and select your Custom Pipeline, then select the Job tab. Make your modifications in the form and click Save. Using the Decodable CLI apply command with a Custom Pipeline YAML definition. Using the Decodable CLI pipeline update command: $ decodable pipeline update --job-file <path to new job file> The arguments to this command are largely identical to pipeline create, though none of them are required when updating a custom pipeline. The only differences are: --type isn’t available, as the type of a Custom Pipeline can’t change. --config-file is replaced by --add-config-file and --remove-config-file. After the Pipeline is updated, restart it to pick up the latest changes. Monitor a Custom Pipeline Once you’ve activated your Custom Pipeline, you’ll see some basic information about the pipeline. Select the Flink Web Dashboard button to open the Flink UI in a new window. You can use the Flink UI to view health and performance metrics, such as checkpoint monitoring, backpressure monitoring and general throughput information. Secrets If your Custom Pipeline interacts with external systems that require authentication, it’s recommended to store the credentials as Decodable Secrets. You can reference the Secret’s value from your Custom Pipeline, and change it without needing to rebuild your Custom Pipeline. Defining Secrets Before defining your Custom Pipeline you must first create the Secrets that you want to use. The user configuring a Custom Pipeline must have read permissions for all referenced Secrets. When you create the Custom Pipeline you must specify each Secret to use. From the Decodable Web App: Use the "Secret" input when configuring a Custom Pipeline From the Decodable CLI: Specify the IDs of the Secrets to make available as a comma-separated list via the --secrets parameter. Accessing Secrets Within a Custom Pipeline implementation you can read the value of a managed Secret directly from the file system. The value of a Secret is accessible as a text file under the path /opt/pipeline-secrets/<secret name>. If you are using a Java Custom Pipeline you can also access a Secret using the DecodableSecret class of the Pipeline SDK, as shown in this example: SourceFunction<String> sourceFunction = SqlServerSource.<String>builder() .hostname("localhost") .port(1433) .database("inventory") .tableList("dbo.items") .username("my-sql-server-user") .password(DecodableSecret.withName("my-sql-server-password").value()) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); Configuration files If you use externalized configuration for your Flink jobs, you can expose that configuration to your Custom Pipeline via configuration files. Configuration files will be made available to the job and can be used in any way by your Java or Python code. To upload a configuration file and make it available to a Custom Pipeline, use one of the following ways. Using Decodable UI: When creating or updating a Custom Pipeline, select your configuration files in the corresponding section. Using the Decodable CLI apply command with a Custom Pipeline YAML definition. Using the Decodable CLI pipeline command: When creating a pipeline with pipeline create command, specify --config-file. When updating an existing pipeline with pipeline update, specify --add-config-file and --remove-config-file. After the Pipeline is started, all specified configuration files are available under /opt/pipeline-config. Note that handling of the configuration files is up to your code, they won’t be applied automatically. State management When you stop a Pipeline, the state of your Pipeline is automatically saved and backed up using a Flink savepoint or checkpoint. When you start the Pipeline again, we’ll use the state to determine where to resume data processing. If you want to start a Custom Pipeline without restoring its state, use the --force flag in the CLI or the Discard State option in the UI. Logs For debugging and analysis purposes, you can retrieve the logs of a running Custom Pipeline. To do so, either use the Flink Web Dashboard, or the Decodable CLI client. Flink web dashboard Navigate to the Custom Pipeline whose logs you’d like to inspect and click on "Flink Web Dashboard". To retrieve the Flink job manager logs, choose "Job Manager" in the menu on the left-hand side, then go to the "Logs" tab. For task manager logs, choose "Task Manager," select one of the task managers, then go to the "Logs" tab. Decodable CLI Alternatively, job logs can be retrieved using the decodable pipeline logs <id> CLI command. -t or --task-index Specify an index value for which job task to retrieve logs from. The default index is 0, corresponding to the JobManager. TaskManagers are available at index 1 (and above if there are multiple tasks). -n or --lines Specify the number of log lines to return. -f or --follow Stream the logs until canceled. Metrics Custom Pipelines expose a set of Flink metrics by default. You can find these in your account’s _metrics stream. If you’d like to expose custom metrics not shown below, you can do so by including a metric group named DecodableMetrics. Any metrics tagged this way will also be included in your account’s _metrics stream. See the Decodable Pipeline SDK for an example of how to do this. Default Flink Metrics flink_jobmanager_Status_JVM_CPU_Load flink_taskmanager_Status_JVM_CPU_Load flink_jobmanager_Status_JVM_Memory_Heap_Used flink_taskmanager_Status_JVM_Memory_Heap_Used flink_jobmanager_Status_JVM_Memory_Heap_Committed flink_taskmanager_Status_JVM_Memory_Heap_Committed flink_jobmanager_Status_JVM_Memory_Heap_Max flink_taskmanager_Status_JVM_Memory_Heap_Max flink_jobmanager_Status_JVM_Memory_NonHeap_Used flink_taskmanager_Status_JVM_Memory_NonHeap_Used flink_jobmanager_Status_JVM_Memory_NonHeap_Committed flink_taskmanager_Status_JVM_Memory_NonHeap_Committed flink_jobmanager_Status_JVM_Memory_NonHeap_Max flink_taskmanager_Status_JVM_Memory_NonHeap_Max flink_jobmanager_Status_JVM_Memory_Metaspace_Used flink_taskmanager_Status_JVM_Memory_Metaspace_Used flink_jobmanager_Status_JVM_Memory_Metaspace_Committed flink_taskmanager_Status_JVM_Memory_Metaspace_Committed flink_jobmanager_Status_JVM_Memory_Metaspace_Max flink_taskmanager_Status_JVM_Memory_Metaspace_Max flink_jobmanager_Status_JVM_Memory_Direct_Count flink_taskmanager_Status_JVM_Memory_Direct_Count flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity flink_taskmanager_Status_JVM_Memory_Direct_TotalCapacity flink_jobmanager_Status_JVM_Memory_Mapped_Count flink_taskmanager_Status_JVM_Memory_Mapped_Count flink_jobmanager_Status_JVM_Memory_Mapped_MemoryUsed flink_taskmanager_Status_JVM_Memory_Mapped_MemoryUsed flink_jobmanager_Status_JVM_Memory_Mapped_TotalCapacity flink_taskmanager_Status_JVM_Memory_Mapped_TotalCapacity flink_jobmanager_Status_JVM_Threads_Count flink_taskmanager_Status_JVM_Threads_Count flink_jobmanager_job_lastCheckpointDuration flink_jobmanager_job_lastCheckpointSize flink_jobmanager_job_lastCheckpointFullSize flink_taskmanager_job_task_backPressuredTimeMsPerSecond flink_taskmanager_job_task_numRecordsIn flink_taskmanager_job_task_numRecordsInPerSecond flink_taskmanager_job_task_numBytesIn flink_taskmanager_job_task_numBytesInPerSecond flink_taskmanager_job_task_operator_numRecordsIn flink_taskmanager_job_task_operator_numRecordsInErrors flink_taskmanager_job_task_operator_numRecordsInPerSecond flink_taskmanager_job_task_operator_numBytesIn flink_taskmanager_job_task_operator_numBytesInPerSecond flink_taskmanager_job_task_numRecordsOut flink_taskmanager_job_task_numRecordsOutPerSecond flink_taskmanager_job_task_numBytesOut flink_taskmanager_job_task_numBytesOutPerSecond flink_taskmanager_job_task_operator_numRecordsOut flink_taskmanager_job_task_operator_numRecordsOutErrors flink_taskmanager_job_task_operator_numRecordsOutPerSecond flink_taskmanager_job_task_operator_numBytesOut flink_taskmanager_job_task_operator_numBytesOutPerSecond flink_jobmanager_job_numRestarts flink_jobmanager_job_numberOfCompletedCheckpoints flink_jobmanager_job_numberOfFailedCheckpoints flink_jobmanager_job_numberOfInProgressCheckpoints flink_jobmanager_job_totalNumberOfCheckpoints flink_jobmanager_taskSlotsTotal flink_jobmanager_numRegisteredTaskManagers