Adam Talbot
Adam TalbotSep 12, 2023

Automating pipeline execution with Nextflow and Tower

Nextflow enables completely hands off pipeline execution, with each process and the execution order written as code and deployed as a single workflow. After launching a pipeline, no user involvement is required meaning it is reliable and perfectly reproducible.

However, bioinformaticians need to do more than build and run workflows, they need to provision compute environments, stage data, create containers, interpret results and publish a summary of their work. Fortunately, Nextflow and Tower provide several useful features that can be used to help automate these higher-level tasks. By leveraging automation, researchers can:

  • Save time, by avoiding the need for manual intervention
  • Avoid error-prone processes
  • Reduce costs, by minimizing unnecessary runs
  • Improve efficiency with improved pipeline predictability and reliability

This article discusses various features in Nextflow and Tower that can be used to automate common tasks adjacent to workflow execution. The specific features covered here include:

  • The Tower API
  • The Tower CLI
  • Using twkit
  • Pipeline actions
  • Cloud service integrations
  • Automated pipeline recovery
  • Automated notifications
  • Container management features

The Tower API

Tower provides a comprehensive REST API and a powerful command line interface (CLI) in addition to the web UI. Some may wonder, “Why do I need another API or CLI when Nextflow itself is command-line driven?”

Users frequently need to automate higher-level tasks including:

  • Retrieving and staging data prior to pipeline runs
  • Setting up compute environments if they do not already exist
  • Deciding what compute environments to use at runtime based on factors including utilization levels, data location, and cost or policy considerations
  • Authenticating to various cloud services, registries, and storage repositories
  • Monitoring execution, handling errors, or consolidating and presenting results

Just as Nextflow allows the description of complex workflows as code, the Tower API extends the same abstraction to accomplish the goals above. As with most modern APIs, the Tower API adheres to the OpenAPI specification, enabling developers to send JSON-based messages to endpoints documented in the Tower service API schema and parse returned messages. The API can be used with Tower Cloud, self-hosted on your preferred cloud provider or on-premises.

Authentication to the Tower API requires a Personal Access Token. You can create an Access Token by logging into Tower, selecting the pull-down under your icon in the top right corner of the screen, and selecting Your tokens. From there, you can create one or more Personal Access Tokens associated with your Tower account and instance, as illustrated below:

blog automating workflows with nextflow and tower 1

Using this secret Token, you can make authenticated API requests, as shown below. In this example using curl, we query the user-info API endpoint to return information about the Tower user associated with the personal access token. We use the Linux jq command to “pretty-print” the JSON output for legibility.

$ curl -X GET "https://api.tower.nf/user-info" \
 -H "Accept: application/json" \
 -H "Authorization: Bearer <your token>" \
| jq
{
  "user": {
    "id": 4610,
    "userName": "fred",
    "email": "frederick.sanger@seqera.io",
    "firstName": null,
    "lastName": null,
    ...
    "lastAccess": "2023-07-10T13:06:01Z",
    "dateCreated": "2022-09-07T15:38:26Z",
    "lastUpdated": "2023-07-10T13:03:50Z",
    "deleted": false
  },
  "needConsent": false,
  "defaultWorkspaceId": 40230138858677
}

We can use Tower API calls like the one above to retrieve information about the Tower environment and perform an action. Almost any action that can be performed using the Tower Web UI can be performed via the API. Examples include:

  • Managing workspaces, participants, and workspace collaborators
  • Programmatically spinning up cloud-based compute environments via Tower Forge
  • Manipulating datasets
  • Creating and updating pipeline actions
  • Adding, updating, or launching pipelines
  • Searching or retrieving data about prior runs

A good way to explore the API without writing code is to visit the swagger-based Nextflow Tower API interface and enter the API key obtained above into the HTTP Bearer field. You can then expand various endpoints and click the “TRY” button to see the JSON returned by various API requests, view HTTP headers, and generate cURL code illustrating how to use the API function. You can also use Postman or similar interactive tools to familiarize yourself with the API.

Using the Tower CLI

Writing code to interact with the API via JSON objects is appropriate for full applications, but for scripting Seqera provides a command line interface for Tower that serves as a more accessible front-end to the Tower API.

You can download the CLI by visiting the GitHub project page and retrieving the Tower CLI binary that corresponds to your operating system (Linux, OS/X, or Windows). Once the Tower CLI is installed, you can use a simple script like the one below to select a Tower workspace, launch a pipeline in that workspace, and view the run.

You will need to define the API endpoint and Tower Access Token and store them in the shell variables TOWER_API_ENDPOINT and TOWER_ACCESS_TOKEN, respectively.

#!/bin/bash
export TOWER_API_ENDPOINT=https://api.tower.nf
export TOWER_ACCESS_TOKEN=<your token>
export TOWER_WORKSPACE_ID=`tw workspaces list | grep showcase | awk '{print $1}'`
tw launch --wait=SUCCEEDED nf-core/rnaseq
export RUN_ID=`tw runs list | grep rnaseq | grep <tower-user> | awk '{print $1}'`
tw runs view --id=$RUN_ID

  Run at [community / showcase] workspace:

    General
    ---------------------+--------------------------------------------
     ID                  | OLFdDfwJP9W7S
     Operation ID        | 2c1e500d-3d59-4a91-9c39-f22d00c561f4
     Run name            | ridiculous_galileo
     Status              | SUCCEEDED
     Starting date       | Tue, 11 Jul 2023 10:00:41 GMT
     Commit ID           | 3bec2331cac2b5ff88a1dc71a21fab6529b57a0f
     Session ID          | 34490d3b-7938-4b3d-961e-c78b6b33bbf0
     Username            | <tower-user>
     Workdir             | s3://nf-tower-bucket/scratch/OLFdDfwJP9W7S
     Container           |
     Executors           | awsbatch
     Compute Environment | AWS_Batch_Ireland_FusionV2_NVMe
     Nextflow Version    | 23.05.0-edge
     Labels              | No labels reported in workspace

You can use the Tower CLI in conjunction with other CLIs. For example, you might use the AWS CLI’s s3 copy facility to stage data or retrieve files from the Nextflow work directory following a pipeline run.

The Tower CLI also supports a ‘-o json’ directive that writes output in JSON format, making it easier to parse output in scripts and integrate with other command line tools.

You can learn more about the Tower CLI by viewing these usage examples.

Using twkit

The Tower CLI provides a standardized, simple interface that is easy to integrate into other tools. Seqera has created twkit, currently in beta, which is a lightweight Python wrapper around the Tower CLI. It can create all Tower entities using a YAML template enabling users to manage and provision their infrastructure specifications with Infrastructure as Code - see here.

For example, I can launch the nextflow-io/hello pipeline on Tower using a YAML template as follows:

launch:
  - name: 'hello-world'                               # Workflow name
    workspace: 'seqeralabs/testing'                   # Workspace name
    compute-env: 'aws_compute_environment'            # Compute environment name
    revision: 'master'                                # Pipeline revision
    pipeline: 'https://github.com/nextflow-io/hello'  # Pipeline URL

This pipeline, as well as any other entities (Compute Environments, Credentials, and Datasets) can be defined in YAML format, and created on Tower by running the following command:

$ twkit hello-world-config.yaml

You can also launch the same pipeline via a Python script. This will essentially allow you to extend the functionality on offer within the Tower CLI by leveraging the flexibility and customization options available in Python - see here.

Pipeline Actions

Another common requirement is to automatically launch pipelines based on events. For example, I may wish to automatically trigger a pipeline run whenever a commit is made to a particular branch in a repository.

Tower currently offers support for native GitHub webhooks to support this functionality. Bitbucket and GitLab support are planned for future releases.

A second type of action available in Tower is Tower Launch Hooks. Launch Hooks are used to create a custom endpoint for a particular pipeline that can be invoked by an external script. This can be used to enable third-party applications to easily launch pipelines programmatically.

Tower Launch hooks can be created through the Tower Web UI or by using the tw actions add tower command via the Tower CLI in a similar manner to the GitHub example above.

Once created, I can view the Tower Launch Hooks associated with a workspace as follows:

$ tw actions list --workspace=seqeralabs/verified-pipelines

  Actions for <tower-user> user:

     ID                     | Name               | Endpoint                                                                               | Status | Source
    ------------------------+--------------------+----------------------------------------------------------------------------------------+--------+--------
     6uVJmV5nHSqoBbGIQopRYZ | LaunchRNASeqFromS3 | https://api.tower.nf/actions/6uVJmV5nHSqoBbGIQopRYZ/launch?workspaceId=215559034546785 | ACTIVE | tower
     7WiIDRY1ymDGUPOnD3IzDN | DemoAction         | https://api.tower.nf/actions/7WiIDRY1ymDGUPOnD3IzDN/launch?workspaceId=215559034546785 | ACTIVE | tower

I can then invoke the URL associated with the Launch Hook to launch the pipeline as follows:

curl -H "Content-Type: application/json" \
     -H "Authorization: Bearer <your token>" \
     https://api.tower.nf/actions/6uVJmV5nHSqoBbGIQopRYZ/launch?workspaceId=215559034546785 \
     --data '{"params":{"foo":"Hello world"}}'

Cloud service integrations

Organizations may have automation requirements that are too complex to be handled by the simple pipeline actions described above. For example, after new sequencing data is uploaded to an S3 bucket, a bioinformatician will need to parse the data and match them to sample information before kicking off a pipeline. The most common way of achieving this in bioinformatics is to use a ‘samplesheet’, composed of sample information and pointing to the location of the sequencing data. Using Tower, we can store a samplesheet as a dataset, then use that as an input to our bioinformatics pipeline to perform an analysis with a clear provenance.

Services such as AWS Lambda, GCP Cloud Functions or Azure Functions are suited to performing this task, due to their low overhead and minimal deployment time. All have a native method to detect files uploaded to cloud storage and can run a scripting language such as Python to create a samplesheet before adding it to Tower as a dataset and starting a bioinformatics pipeline.

Graham Wright provides an excellent end-to-end example illustrating how S3 Event Notifications can trigger an AWS Lambda function to automate actions in Tower in his article Workflow Automation for Nextflow Pipelines.

You can view the relevant code examples in the GitHub repo here: https://github.com/seqeralabs/datasets-automation-blog

Automating pipeline recovery

A feature of Nextflow is that it caches pipeline execution steps. This means that users can go back and modify parts of their pipeline and re-execute only the parts of the pipeline impacted by a code change. Similarly, in the event of an error, users can resolve it and resume from where they left off using the Nextflow _-resume _option.

This is helpful when building or modifying pipelines because it avoids the time and cost of re-running a process unnecessarily, where the logic and data remains unchanged.

This resume functionality is explained in Abhinav Sharma’s article, Analyzing caching behavior of pipelines, and in an earlier article by Evan Floden, Troubleshooting Nextflow resume.

This resume functionality is powerful, but it still requires users to manually resume pipeline execution as shown:

$ nextflow ./main.nf -profile docker -resume

If a pipeline fails at runtime, it may be a few hours before a user can detect the failure and manually intervene to correct errors and resume the pipeline.

To avoid the need for human intervention, Nextflow provides an errorStrategy directive that determines how an error condition (an exit with a non-zero status) is managed by a Nextflow process. By default, Nextflow will terminate process execution in case of an error, but developers can optionally set the errorStrategy to “retry” a failed process a set number of times. This allows pipelines to resume execution without human intervention in the case of transient errors associated with compute environments, networks, or storage, significantly improving reliability.

A common reason for failures is insufficient resources at runtime. To address this, nf-core pipelines employ a best practice where the resources allocated to each workflow step are defined in a base.config file. Process labels are used to simplify resource allocations.

An errorStrategy is configured to automatically retry execution when tasks fail with specific exit codes. Different retry policies are applied to processes with different labels as shown in the example below. While developers are free to choose their own label names, using the nf-core supplied labels naming conventions is encouraged.

process {

    errorStrategy = { task.exitStatus in ((130..145) + 104) ? 'retry' : 'finish' }
    maxRetries    = 3

    withLabel:process_low {
        cpus   = { check_max( 2     * task.attempt, 'cpus'    ) }
        memory = { check_max( 12.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 4.h   * task.attempt, 'time'    ) }
    }
    withLabel:process_medium {
        cpus   = { check_max( 6     * task.attempt, 'cpus'    ) }
        memory = { check_max( 36.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 8.h   * task.attempt, 'time'    ) }
    }
    withLabel:process_high {
        cpus   = { check_max( 12    * task.attempt, 'cpus'    ) }
        memory = { check_max( 72.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 16.h  * task.attempt, 'time'    ) }
    }

In the example above, processes will be retried up to three times if they terminate with an exit status between 130 and 145 or 104. Other non-zero exit codes will result in an orderly shutdown of the pipeline per the finish directive.

Since these exit codes are associated with insufficient resources, Nextflow will increase the resources allocated to each process with each attempt. If a process is retried three times and still fails, the pipeline will be terminated.

The check_max() function is provided for convenience as part of the nf-core tooling. It ensures that returned/calculated values do not exceed maximums specified on the —max_cpus , —max_memory and —max_time command line switches. If you use check_max(), make sure that the function is included in your nextflow.config as shown here or in a referenced configuration file.

Automating notifications

Nextflow provides basic workflow notification capabilities that can be enabled from the command line. Users can send a notification message when a workflow completes or terminates as follows:

$ nextflow run <pipeline name> -N <recipient address>

While basic notifications are useful, users can achieve more granular control over notifications using Nextflow event handlers and workflow introspection features. Developers can obtain extensive runtime metadata by checking properties associated with the workflow object during execution including the project directory, container engine, Git repository, and more.

Nextflow also enables developers to provide separate onError and onComplete event handlers, invoked when a pipeline either terminates or completes successfully. Event handlers can use workflow introspection features to access workflow and runtime metadata from within their Nextflow scripts. This enables them to understand the runtime environment, handle various special cases, and even avoid conditions that might lead to errors at runtime.

Event handlers provide developers with granular control over error handling and notifications. For example, a Nextflow onError event handler might call an API endpoint or send an SMS text message using a cloud service such as Amazon’s Simple Notification Service (SNS).

A simple example illustrating the use of event handlers with workflow introspection is provided below:

workflow.onComplete = {
    // any workflow property can be used here
    println "Pipeline complete"
    println "Command line: $workflow.commandLine"
}

workflow.onError = {
    println "Oops .. something when wrong"
}

Container management features

With the release of Wave containers in October of 2022, automation in Nextflow and Tower now extends to container management as well.

Traditionally, building and testing containers was handled as a separate activity from developing pipelines. Containers were typically built using Dockerfile and tools such as Conda and BioContainers to simplify the installation of scientific software. Developers would then test the container, and publish it to a registry such as DockerHub, Quay.io or a preferred cloud-based container registry service.

See the tutorial Building Containers for Scientific Workflows

The problem with this traditional approach is that it separates the process of building containers from running them. Perhaps a container is out of date, or hasn’t been pushed to the container registry yet. Automation can solve these problems but requires additional expertise and may require pipeline developers to adopt a new development workflow.

Seqera’s Wave container service solves this problem by enabling dynamic provisioning of container images from Conda or Dockerfile recipes from within a Nextflow pipeline at runtime. Developers simply declare the Conda packages with their Nextflow pipeline and Wave assembles the required container on the fly.

The Wave service can also be used to automatically augment containers on the fly to meet site security policies or to provide a caching function to enhance performance and reduce data transfer costs.

Wave integrates with Tower credential management enabling seamless access and publishing to various public and private container registries including Amazon Elastic Container Registry, Azure Container Registry, and others.

You can learn more about how the Wave container service can be used to automate building, deploying, augmenting, and managing containers by visiting the Wave containers showcase.

Learning more

Do you have additional suggestions about integrating Nextflow or Tower with external business processes? We’d love to hear from you. Join the conversation at nextflow.slack.com and post to the #random channel!