Skip to main content

Nextflow Pipeline Tutorial for Developers

Introduction

Welcome to the Nextflow Pipeline Tutorial for Developers. This guide is designed for those who are familiar with basic bioinformatics concepts and are looking to leverage Nextflow for efficient workflow management in BatchX environments. The tutorial will walk you through the process of creating and executing a Nextflow pipeline that performs a common read quality control and trimming bioinformatics analysis. Whether you are a beginner in Nextflow or looking to refine your skills, this tutorial will provide you with the foundational knowledge needed to get started with Nextflow in BatchX. If this is your first time using BatchX, you can create an account by going to BatchX.io and learn the fundamentals of using BatchX and previous tutorials here.

What is Nextflow

Nextflow is a workflow management system designed for fast deployment of complex, scalable, and reproducible computational pipelines. It is very similar to Snakemake and offers a wide variety of features.

One of the main features of Nextflow is its ability to orchestrate complex workflows with ease. It uses a domain-specific language (DSL) that is declarative and scriptable, allowing users to define processes and their dependencies in a straightforward manner. This makes it particularly suitable for managing the complex data processing pipelines often encountered in bioinformatics. Moreover, Nextflow is designed for cross-platform deployment, meaning it can run on local machines, cloud environments, or high-performance computing clusters without needing modification to the workflow scripts. This flexibility, coupled with its containerization support (e.g., Docker, Singularity), ensures reproducibility and scalability, which are crucial for large-scale bioinformatics projects.

Example workflow

This tutorial demonstrates how to create a basic pipeline in BatchX using Nextflow as a workflow manager. The workflow will perform an initial quality control of reads, trimming and a final quality control of the trimmed reads.

A BatchX pipeline should have the following files in a similar structure as shown below:

Dockerfile
entrypoint.sh
nextfile.nf
parseJson.py
manifest/
manifest.json

Dockerfile

The Dockerfile is the foundation of your BatchX environment. Here, we set up Nextflow using Conda, ensuring a consistent and controlled environment. Each line in the Dockerfile plays a critical role – from installing necessary dependencies to defining how to execute Nextflow. This setup contributes to the pipeline reproducibility and isolation from external changes.

FROM condaforge/mambaforge:latest@sha256:a119fe148b8a276397cb7423797f8ee82670e64b071dc39c918b6c3513bd0174
RUN mamba install -c conda-forge -c bioconda -c defaults nextflow fastqc -y
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get -y install default-jdk graphviz
WORKDIR /batchx
ADD https://dist.batchx.io/cli/batchx-daemon/latest latest
RUN bash -c 'source /dev/stdin <<< "$( curl -sS https://dist.batchx.io/cli/batchx-install/docker/docker-install.sh#try=1 )"'
COPY entrypoint.sh entrypoint.sh
COPY nextfile.nf parseJson.py /batchx/
RUN chmod -R 777 /batchx
ENTRYPOINT ["/batchx/entrypoint.sh"]
LABEL io.batchx.manifest=11
COPY manifest /batchx/manifest/

The above Dockerfile defines how to install nextflow, copy essential files into the image and use the entrypoint.sh script as entrypoint so the image can be run as an executable.

entrypoint.sh

The entrypoint.sh script acts as the initial trigger for the workflow. Upon starting the Docker container, this script launches the bx daemon and initiates the Nextflow pipeline (nextfile.nf). It is a crucial component that bridges the Docker environment with the Nextflow workflow execution. This file also instructs Nextflow to generate a directed acyclic graph (DAG) alongwith with the output path for it.

Below are the contents of the entrypoint.sh.

id
source /batchx/bx-start.sh;
nextflow nextfile.nf -with-dag /batchx/output/dag.svg

manifest.json

This file contains essential information for the pipeline including, the definition of inputs and outputs the pipeline requires, as well as the parameters to configure the execution of the tools being orchestrated. In this example, the main input is a FASTQ file. The pipeline employs the fastQC and trimmomatic tools, for which vcpus and memory values can be specified, although a default values exist in both cases. For the implementation of a real pipeline, and not just an example as it is the case here, it would probably make sense to include additional parameters that the tools support for a better customization of pipeline execution. The complete BatchX contract can be found here.

{
"name": "tutorial/nextflow",
"title": "Nextflow tutorial pipeline.",
"schema": {
"input": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {
"sample": {
"type": "object",
"required": true,
"additionalProperties": false,
"title": "Sample data.",
"properties": {
"projectName": {
"type": "string",
"required": true,
"pattern": "^[a-zA-Z0-9._-]+$",
"title": "Name for the study project.",
"description": "This value will be used for file naming and as outputPrefix or equivalent for the tool run."
},
"fastq": {
"type": "string",
"format": "file",
"title": "FASTQ file.",
"description": "[FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) file."
}
}
},
"global": {
"type": "object",
"required": false,
"default": {},
"additionalProperties": false,
"title": "Pipeline-wide resources",
"properties": {
"timeout": {
"type": "integer",
"default": 1000,
"required": false,
"title": "Time (in minutes) for each job in the workflow to run before being cancelled."
}
}
},
"tools": {
"type": "object",
"required": false,
"additionalProperties": false,
"title": "Tools to be run.",
"default": {},
"properties": {
"fastqc": {
"type": "object",
"required": false,
"default": {},
"additionalProperties": false,
"title": "Generates quality control metrics for FASTQ files using FastQC.",
"properties": {
"vcpus": {
"type": "integer",
"minimum": 1,
"default": 1,
"required": false,
"title": "VCPUs to be used."
},
"memory": {
"type": "integer",
"minimum": 2000,
"default": 4000,
"required": false,
"title": "Memory (RAM Mbs) to be used."
}
},
"description": "Generates quality control metrics for [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) files using FastQC."
},
"trimmomatic": {
"type": "object",
"required": false,
"default": {},
"additionalProperties": false,
"title": "Performs trimming tasks for sequencing reads in FASTQ file format using Trimmomatic.",
"properties": {
"vcpus": {
"type": "integer",
"minimum": 1,
"default": 1,
"required": false,
"title": "VCPUs to be used."
},
"memory": {
"type": "integer",
"minimum": 2000,
"default": 2000,
"required": false,
"title": "Memory (RAM Mbs) to be used."
}
},
"description": "Performs trimming tasks for sequencing reads in [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) file format using Trimmomatic."
}
}
}
}
},
"output": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {
"dag": {
"type": "string",
"format": "file",
"required": true,
"title": "SVG graph with the DAG of executed jobs."
},
"tools": {
"type": "object",
"additionalProperties": false,
"title": "This pipeline provides the following outputs, grouped by tool.",
"properties": {
"fastqc": {
"type": "object",
"required": true,
"additionalProperties": false,
"title": "Generates quality control metrics for FASTQ files using FastQC.",
"description": "Generates quality control metrics for [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) files using FastQC.",
"properties": {
"readCounts": {
"title": "Read counts of the FASTQ files analyzed.",
"description": "Read counts of the [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) files analyzed.",
"type": "array",
"required": true,
"items": {
"type": "object",
"title": "Read counts.",
"properties": {
"sampleName": {
"type": "string",
"required": true,
"title": "Sample name."
},
"readCount": {
"type": "number",
"required": true,
"title": "Number of reads in FASTQ.",
"description": "Number of reads in [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format)."
}
}
}
},
"htmlFiles": {
"title": "FastQC HTML results compressed as zip files.",
"required": true,
"type": "string",
"format": "file"
}
}
},
"trimmomatic": {
"type": "object",
"required": true,
"additionalProperties": false,
"title": "Performs trimming tasks for sequencing reads in FASTQ file format using Trimmomatic.",
"description": "Performs trimming tasks for sequencing reads in [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) file format using Trimmomatic.",
"properties": {
"trimmedFastq": {
"title": "Trimmed reads file.",
"required": true,
"type": "string",
"format": "file"
}
}
},
"trimmed_fastqc": {
"type": "object",
"required": true,
"additionalProperties": false,
"title": "Generates quality control metrics for the trimmed FASTQ files using FastQC.",
"description": "Generates quality control metrics for the trimmed [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) files using FastQC.",
"properties": {
"readCounts": {
"title": "Read counts of the FASTQ files analyzed.",
"description": "Read counts of the [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format) files analyzed.",
"type": "array",
"required": true,
"items": {
"type": "object",
"title": "Read counts.",
"properties": {
"sampleName": {
"type": "string",
"required": true,
"title": "Sample name."
},
"readCount": {
"type": "number",
"required": true,
"title": "Number of reads in FASTQ.",
"description": "Number of reads in [FASTQ](https://en.wikipedia.org/wiki/FASTQ_format)."
}
}
}
},
"htmlFiles": {
"title": "FastQC HTML results compressed as zip files.",
"required": true,
"type": "string",
"format": "file"
}
}
}
}
}
}
}
},
"pipeline": {
"steps": [
"batchx@bioinformatics/fastqc:4.0.1",
"batchx@bioinformatics/trimmomatic:1.3.3"
]
},
"runtime": {
"minMem": 1000,
"defaultVcpus": 1
},
"changeLog": "Basic nextflow tutorial pipeline.",
"author": "batchx@suppal",
"version": "0.0.6"
}

nextfile.nf

The nextfile.nf is the heart of the pipeline. It orchestrates everything, detailing the execution of fastQC and trimmomatic on the reads, followed by a final fastQC step using the trimmed reads. This file showcases Nextflow's power in automating and structuring complex data processes. Nextflow automatically organizes the steps for the user.

params.input="/batchx/input/input.json"
params.output='/batchx/output/output.json'
process run_fastqc {
output:
stdout
script:
"""
#!/opt/conda/bin/python
import json
import subprocess
import sys
import os
sys.path.append(os.path.dirname('/batchx/parseJson.py'))
from parseJson import parseJsonFile
parsedJson=parseJsonFile(\"${params.input}\")
args = {
"fastqs": [parsedJson["sample"]["fastq"]],
"outputPrefix": parsedJson["sample"]["projectName"]
}
json_args = json.dumps(args)
vcpus=parsedJson["tools"]["fastqc"]["vcpus"]
memory=parsedJson["tools"]["fastqc"]["memory"]
timeout = int(parsedJson["global"]["timeout"]) * 60
run=subprocess.call(f"bx run -t {timeout} -v {vcpus} -m {memory} -l batchx@bioinformatics/fastqc:4.0.1 '{json_args}'", shell=True)
if run != 0:
raise ValueError(run)
"""
}
process run_trimmomatic {
output:
stdout
script:
"""
#!/opt/conda/bin/python
import json
import subprocess
import sys
import os
sys.path.append(os.path.dirname('/batchx/parseJson.py'))
from parseJson import parseJsonFile
parsedJson=parseJsonFile(\"${params.input}\")
args = {
"fastqFileR1":parsedJson["sample"]["fastq"],
"outputPrefix": parsedJson["sample"]["projectName"]
}
json_args = json.dumps(args)
vcpus=parsedJson["tools"]["trimmomatic"]["vcpus"]
memory=parsedJson["tools"]["trimmomatic"]["memory"]
timeout = int(parsedJson["global"]["timeout"]) * 60
run=subprocess.call(f"bx run -t {timeout} -v {vcpus} -m {memory} -l batchx@bioinformatics/trimmomatic:1.3.3 '{json_args}'", shell=True)
if run != 0:
raise ValueError(run)
"""
}
process run_trimmed_fastqc {
input:
val 'trimmomatic'
output:
stdout
"""
#!/opt/conda/bin/python
import json
import subprocess
import sys
import os
sys.path.append(os.path.dirname('/batchx/parseJson.py'))
from parseJson import parseJsonFile
parsedJson=parseJsonFile(\"${params.input}\")
args = {
"fastqs": [${trimmomatic}.get("trimmedFastqR1")],
"outputPrefix": parsedJson["sample"]["projectName"]+"_trimmed_fastqc"
}
json_args = json.dumps(args)
vcpus=parsedJson["tools"]["fastqc"]["vcpus"]
memory=parsedJson["tools"]["fastqc"]["memory"]
timeout = int(parsedJson["global"]["timeout"]) * 60
run=subprocess.call(f"bx run -t {timeout} -v {vcpus} -m {memory} -l batchx@bioinformatics/fastqc:4.0.1 '{json_args}'", shell=True)
if run != 0:
raise ValueError(run)
"""
}
process writeOutput {
input:
val 'fastqc'
val 'trimmomatic'
val 'fastqctrim'
output:
stdout
script:
"""
#!/opt/conda/bin/python
import json
import subprocess
outputJson = {
"dag":"/batchx/output/dag.svg",
"tools":{
"fastqc":{
"readCounts":${fastqc}.get("readCounts"),
"htmlFiles":${fastqc}.get("htmls")
},
"trimmomatic":{
"trimmedFastq":${trimmomatic}.get("trimmedFastqR1")
},
"trimmed_fastqc":{
"readCounts":${fastqctrim}.get("readCounts"),
"htmlFiles":${fastqctrim}.get("htmls")
}
}
}
with open(\'${params.output}\', 'w+') as json_file:
json.dump(outputJson, json_file)
"""
}
workflow {
fastqcOutput = run_fastqc()
trimmomaticOutput = run_trimmomatic()
fastqctrimOutput = run_trimmed_fastqc(trimmomaticOutput)
writeOutput(fastqcOutput, trimmomaticOutput,fastqctrimOutput)
}

You will notice that we are importing a function called parseJsonFile from the file parseJson. This function is used to parse a json file. You can easily implement this function in each of the processes where we are importing it, however, importing using a separate script helps keep things more modular. Here are the contents of parseJson.py.

import json
def parseJsonFile(file):
with open(file, "r") as jsonFile:
readJsonFile = jsonFile.read()
return json.loads(readJsonFile)

Next steps

Once the files required to implement the pipeline are correctly stored and in the structure defined above, you can follow these steps to run the pipeline locally as well as in the BatchX platform. Links are provided for additional guidance.

  1. Build the image using the docker build command.
  2. Clone the tools the pipeline orchestrates into your account. Use these links to clone them from the web UI: fastQC and trimmomatic. Click on the yellow clone button in the upper right corner.
  3. Upload a small FASTQ file to your BatchX filesystem. You can either use the cp command or drag and drop the FASTQ from the Files section in the web UI.
  4. Test the pipeline locally using the run-local command. Use the path to the uploaded FASTQ file as input for the pipeline, not the path for the local file in your machine.
  5. Once the execution runs successfully in your machine you can import the pipeline to your account using the import command.
  6. Finally, once imported, you can test the pipeline in the BatchX platform using the run command or submit the job from the web UI.

Further resources