This notebook is a version of the PDC_clustergram.ipynb notebook adapted to use Nextflow. This notebook assumes that you already have a basic level of familiarity with Nextflow. If this is your first time seeing Nextflow, then you may find it useful to review the canine_datacommons_fastq_reader_nextflow.ipynb notebook before this one.
As in the original Proteome Data Commons (PDC) analysis, this notebook attempts to demonstrate how to do the following:
However this notebook also has the following additional goals:
This notebook separates the different steps of the analysis into seperate Python script files which are executed by separate Nextflow processes. Thus to clarify any of the steps in the Python analysis itself, refer back to the original PDC_clustergram tutorial. The comments and explanation in this notebook will focus on the modifications of the original tutorial for use with Nextflow. The clustergrams and processed data from the analysis are saved to a results folder in the base directory for later reference.
Most of the code for the Python scripts in this section is directly taken from the original notebook with the only differences arising from reading in command line arguments (to help pass around variables used between the processes) and saving the results of each section to files (which are passed on as inputs to later processes as well).
The next 3 sections of the notebook write Python files for the four main processes in the Nextflow workflow.
%%writefile query_data.py
import argparse
import requests
import json
# Define the function to make the GraphQL query
def query_pdc(query):
url = 'https://pdc.cancer.gov/graphql'
# Extended timeout time to prevent timeouts
response = requests.post(url, json={'query': query}, timeout=30)
if response.ok:
return response.json()
else:
return response.raise_for_status()
def main(pdc_study_id, data_type):
# Set up the queries as in the original notebook
quant_data_query = '''
{
quantDataMatrix(
pdc_study_id: "''' + pdc_study_id + '''" data_type: "''' + data_type + '''" acceptDUA: true
)
}'''
metadata_query = '''
{
clinicalMetadata(pdc_study_id: "''' + pdc_study_id + '''" acceptDUA: true) {
aliquot_submitter_id
morphology
primary_diagnosis
tumor_grade
tumor_stage
}
}
'''
# Query the API
expression_data = query_pdc(quant_data_query)
clinical_metadata = query_pdc(metadata_query)
# Write the data to JSON files to be fed as input to subsequent scripts
with open("expression_data.json", "w") as f:
json.dump(expression_data, f)
with open("clinical_metadata.json", "w") as f:
json.dump(clinical_metadata, f)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Query PDC API for specific study ID and data type.')
parser.add_argument('pdc_study_id', type=str, help='The PDC Study ID')
parser.add_argument('data_type', type=str, help='The type of data being requested')
args = parser.parse_args()
main(args.pdc_study_id, args.data_type)
Writing query_data.py
%%writefile process_data.py
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import zscore
import json
import argparse
def main(mask_na):
with open('expression_data.json', 'r') as f:
decoded = json.load(f)
matrix = decoded['data']['quantDataMatrix']
ga = pd.DataFrame(matrix[1:], columns=matrix[0]).set_index('Gene/Aliquot')
oldnames = list(ga.columns)
newnames = [ x.split(':')[1] for x in oldnames ]
ga.rename(columns=dict(zip(oldnames, newnames)), inplace=True)
ga = ga.sort_index(axis=1)
for col in ga.keys():
ga[col] = pd.to_numeric(ga[col], errors='coerce')
ga = ga.fillna(mask_na)
# The processed data is saved to be fed into later steps in the workflow
ga.to_csv('processed_data.csv')
zdf = ga.T.apply(zscore, ddof=len(ga.columns)-1)
zdf = zdf.T
# A separate file is written with the data after z-score transformation
zdf.to_csv('transformed_data.csv')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process and transform data.')
parser.add_argument('mask_na', type=float, help='The value to use as a mask in the processing')
args = parser.parse_args()
main(args.mask_na)
Writing process_data.py
%%writefile generate_clustermap.py
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import json
import argparse
def get_colors(df, name, color) -> pd.Series:
s = df[name]
su = s.unique()
colors = sns.light_palette(color, len(su))
lut = dict(zip(su, colors))
return s.map(lut)
def main(input_file, output_file, mask_na):
with open('clinical_metadata.json', 'r') as f:
decoded = json.load(f)
clin_matrix = decoded['data']['clinicalMetadata']
metadata = pd.DataFrame(clin_matrix, columns=clin_matrix[0]).set_index('aliquot_submitter_id')
metadata = metadata.sort_index(axis=0)
stage_col_colors = get_colors(metadata, 'tumor_stage', 'red')
diagnosis_col_colors = get_colors(metadata, 'primary_diagnosis', 'green')
# Load the data from the input file
ga = pd.read_csv(input_file)
# Have to manually set index again since index is converted into a column when saved
ga = ga.set_index("Gene/Aliquot")
sns.clustermap(ga, metric='euclidean', method='complete', cmap='seismic', mask=ga == mask_na, center=0.,
col_colors=[stage_col_colors, diagnosis_col_colors], figsize=(12.5, 50))
# Save the cluster map to the output file
plt.savefig(output_file)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generate cluster map from processed data.')
parser.add_argument('input_file', type=str, help='Path to the input file')
parser.add_argument('output_file', type=str, help='Path to the output file')
parser.add_argument('mask_na', type=float, help='The value used as a mask in the processing')
args = parser.parse_args()
main(args.input_file, args.output_file, args.mask_na)
Writing generate_clustermap.py
The code block below provides a Nextflow
pipeline script, consisting of three processes: QueryData
, ProcessData
, and GenerateClustermap
.
QueryData
: This process queries data from an API and produces two output files, expression_data.json
and clinical_metadata.json
. The process uses a Python script named query_data.py
to retrieve the data.
ProcessData
: This process processes the queried data by converting column names and ensuring all data values are numeric. It accepts as input the files produced by QueryData
and generates a processed_data.csv
file and a transformed_data.csv
. These are each separately fed into the following GenerateClustermap
process.
GenerateClustermap
: This process generates a clustermap based on the input file. It outputs an image file of the form clustermap*.png
(the exact image name depends on the name of the input file). This process uses a Python script named generate_clustermap.py
to generate the clustermap.
The main workflow itself calls the QueryData process, properly formats the results of those queries in ProcessData (outputting both z-score transformed and untransformed processed data) and finally clustermaps are created from both of these types of data. The .flatten()
code ensures that the two csv files outputted by ProcessData are fed individually into the GenerateClustermap process.
%%writefile main.nf
#!/usr/bin/env nextflow
// Define a process called 'QueryData'
process QueryData {
// QueryData returns the expression_data.json and clinical_metadata.json
// files from the API
// The initial_task label is used in the nextflow.config file to
// specify that the task should use 4 CPU's (the max currently
// available in the workspace when this tutorial was created)
// as this process must complete before any further processes in
// the workflow can be run
label 'initial_task'
// Define the outputs of the process
output:
tuple path("expression_data.json"), path("clinical_metadata.json")
// Define the script that will be run in this process
script:
"""
python3 ${baseDir}/query_data.py ${params.pdc_study_id} ${params.data_type}
"""
// the script takes two command line arguments which are given by parameters
// which are specified in the nextflow.config file
}
process ProcessData {
// ProcessData converts the names of the columns and forces the
// data values to be numeric
// Copy the outputs of ProcessData to the results directory in
// the base workflow directory
publishDir "${baseDir}/results", mode: 'copy'
label 'initial_task'
input:
tuple path(expression_data), path(clinical_metadata)
output:
path("*.csv")
script:
"""
python3 ${baseDir}/process_data.py ${params.mask_na}
"""
}
process GenerateClustermap {
// GenerateClustermap creates a clustermap from
// the given input file
// Copy the outputs of GenerateClustermap to the results directory in
// the base workflow directory
publishDir "${baseDir}/results", mode: 'copy'
label 'main_task'
input:
path(input_data)
tuple path(expression_data), path(clinical_metadata)
output:
path("clustermap*.png")
script:
"""
python3 ${baseDir}/generate_clustermap.py ${input_data} clustermap_${input_data.baseName}.png ${params.mask_na}
"""
}
workflow {
data = QueryData()
processed = ProcessData(data)
GenerateClustermap(processed.flatten(), data)
}
Overwriting main.nf
The environment.yml file specifies the dependencies needed for the Python scripts in a Conda environment. The nextflow.config file specifies use of the PDC_clustergram_env
Conda environment for execution of the Python scripts in the workflow.
%%writefile environment.yml
name: PDC_clustergram_env
channels:
- defaults
dependencies:
- matplotlib
- seaborn
- scipy
- pandas
- requests
Writing environment.yml
The code block below defines a nextflow.config
file for a Nextflow pipeline.
Process Section: There are specifications for two kinds of processes, identified by labels initial_task
and main_task
, which dictate how many CPUs are to be assigned for each kind of process. initial_task
processes are allocated 4 CPUs and main_task
processes are given 2 CPUs. If these weren't explicitly specified Nextflow would adaptively assign CPU's to the seperate processes - these labels force the specified CPU use. This section also contains configuration for a conda environment named PDC_clustergram_env
using an environment file (environment.yml
). If the specified conda environment doesn't already exist, Nextflow will create it.
Parameters Section: This defines default parameter values for the pdc_study_id
, data_type
, and mask_na
variables, which can be overwritten at runtime if different values are supplied. These parameters are used in the scripts invoked by the pipeline.
Executor: The executor
directive is set to 'local', meaning the tasks will be executed in the local environment.
%%writefile nextflow.config
process {
// Allows execution of different types of tasks with different numbers of CPU's
withLabel: initial_task {
cpus = 4
}
withLabel: main_task {
cpus = 2
}
conda {
envCreate = true
envName = 'PDC_clustergram_env'
// If the environment does not exist, Nextflow will create it using the specified YAML file
envFile = 'environment.yml'
}
}
params {
// Define parameters that can be passed to the script at runtime
pdc_study_id = "PDC000127"
data_type = "log2_ratio"
mask_na = "0.000666"
}
process {
executor = 'local'
}
// Sets directory to store the working files in
workDir = './work'
Overwriting nextflow.config
Nextflow workflows are usually run from the command line but Jupyter Notebooks allow use of the command line by putting an exclamation mark in front of the command. If running multiple times, the --resume
option can be added after main.nf to reuse the outputs of completed processes from previous runs which were not subsequently modified.
!nextflow run main.nf
N E X T F L O W ~ version 22.10.6
Launching `main.nf` [modest_noyce] DSL2 - revision: 699652ed70
[- ] process > QueryData -
[- ] process > ProcessData -
[- ] process > GenerateClustermap -
executor > local (1)
[b4/80a7f3] process > QueryData [ 0%] 0 of 1
[- ] process > ProcessData -
[- ] process > GenerateClustermap -
executor > local (1)
[b4/80a7f3] process > QueryData [ 0%] 0 of 1
[- ] process > ProcessData -
[- ] process > GenerateClustermap -
executor > local (1)
[b4/80a7f3] process > QueryData [100%] 1 of 1 ✔
[- ] process > ProcessData -
[- ] process > GenerateClustermap -
executor > local (2)
[b4/80a7f3] process > QueryData [100%] 1 of 1 ✔
[6b/61244b] process > ProcessData [ 0%] 0 of 1
[- ] process > GenerateClustermap -
executor > local (4)
[b4/80a7f3] process > QueryData [100%] 1 of 1 ✔
[6b/61244b] process > ProcessData [100%] 1 of 1 ✔
[b7/439276] process > GenerateClustermap (2) [ 0%] 0 of 2
executor > local (4)
[b4/80a7f3] process > QueryData [100%] 1 of 1 ✔
[6b/61244b] process > ProcessData [100%] 1 of 1 ✔
[b7/439276] process > GenerateClustermap (2) [ 50%] 1 of 2
executor > local (4)
[b4/80a7f3] process > QueryData [100%] 1 of 1 ✔
[6b/61244b] process > ProcessData [100%] 1 of 1 ✔
[f7/2b1d9d] process > GenerateClustermap (1) [100%] 2 of 2 ✔
Completed at: 30-Jul-2023 00:11:40
Duration : 1m 1s
CPU hours : 0.1
Succeeded : 4
Load in the clustermap of the processed data:
from IPython.display import Image
Image(filename='results/clustermap_processed_data.png')
Load in the clustermap for the standardized data:
from IPython.display import Image
Image(filename='results/clustermap_transformed_data.png')
Load in the dataframe of processed data produced by the ProcessData process:
import pandas as pd
processed_data = pd.read_csv("results/processed_data.csv")
processed_data.head()
Gene/Aliquot | CPT0000640003 | CPT0000660001 | CPT0000780007 | CPT0000790001 | CPT0000870016 | CPT0000890001 | CPT0001180009 | CPT0001190001 | CPT0001220008 | ... | NCI7-4 | NCI7-5 | QC1 | QC2 | QC3 | QC4 | QC5 | QC6 | QC7 | QC8 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | A1BG | 1.0401 | -0.8370 | 0.5803 | -0.6064 | 0.1041 | -0.1866 | -0.7143 | -0.3100 | 0.6129 | ... | -1.8485 | -1.7370 | -1.1657 | -1.0042 | -1.1258 | -1.0642 | -1.0423 | -1.0204 | -1.0951 | -0.9996 |
1 | A1CF | -1.0913 | 0.0556 | -0.4647 | 0.5539 | 0.3555 | 0.4798 | -0.3470 | -0.1472 | 0.3500 | ... | -1.3741 | -1.1999 | -1.9917 | -1.6901 | -1.7759 | -2.1722 | -2.1297 | -1.5970 | -1.6895 | -2.1075 |
2 | A2M | 0.8438 | -1.3295 | -0.8262 | -0.6975 | 1.2140 | -0.4180 | -0.5647 | -0.5371 | 1.1097 | ... | -2.8584 | -2.6337 | -0.7650 | -0.8239 | -0.7353 | -0.7420 | -0.7303 | -0.7643 | -0.7210 | -0.8130 |
3 | AAAS | -0.1501 | -0.1593 | 0.0268 | 0.0378 | 0.0444 | 0.1004 | 0.2691 | 0.0066 | 0.0845 | ... | 1.7525 | 1.3627 | -0.0927 | -0.1887 | 0.0349 | -0.2330 | 0.0259 | -0.0173 | -0.1095 | -0.0528 |
4 | AACS | -0.8881 | -0.1487 | -0.5417 | 0.2910 | -0.3251 | -0.0668 | 0.3244 | 0.3472 | -0.3794 | ... | 0.6680 | 0.9196 | 0.2545 | 0.2514 | 0.2308 | 0.2361 | 0.1962 | 0.2062 | 0.2574 | 0.2334 |
5 rows × 208 columns
Load in the dataframe of standardized data produced by the ZscoreTransformation process:
import pandas as pd
processed_data = pd.read_csv("results/transformed_data.csv")
processed_data.head()
Gene/Aliquot | CPT0000640003 | CPT0000660001 | CPT0000780007 | CPT0000790001 | CPT0000870016 | CPT0000890001 | CPT0001180009 | CPT0001190001 | CPT0001220008 | ... | NCI7-4 | NCI7-5 | QC1 | QC2 | QC3 | QC4 | QC5 | QC6 | QC7 | QC8 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | A1BG | 0.130247 | -0.088179 | 0.076743 | -0.061345 | 0.021331 | -0.012496 | -0.073901 | -0.026855 | 0.080537 | ... | -0.205880 | -0.192906 | -0.126427 | -0.107635 | -0.121784 | -0.114616 | -0.112068 | -0.109520 | -0.118212 | -0.107099 |
1 | A1CF | -0.079223 | 0.025629 | -0.021938 | 0.071185 | 0.053047 | 0.064411 | -0.011177 | 0.007089 | 0.052544 | ... | -0.105077 | -0.089152 | -0.161540 | -0.133967 | -0.141811 | -0.178042 | -0.174156 | -0.125456 | -0.133912 | -0.172127 |
2 | A2M | 0.089455 | -0.104303 | -0.059431 | -0.047957 | 0.122460 | -0.023039 | -0.036118 | -0.033657 | 0.113162 | ... | -0.240610 | -0.220577 | -0.053975 | -0.059226 | -0.051327 | -0.051925 | -0.050882 | -0.053913 | -0.050052 | -0.058255 |
3 | AAAS | -0.058840 | -0.061151 | -0.014412 | -0.011649 | -0.009992 | 0.004073 | 0.046442 | -0.019485 | 0.000079 | ... | 0.418997 | 0.321099 | -0.044424 | -0.068535 | -0.012378 | -0.079661 | -0.014638 | -0.025488 | -0.048644 | -0.034403 |
4 | AACS | -0.148612 | -0.026274 | -0.091298 | 0.046476 | -0.055461 | -0.012724 | 0.052002 | 0.055775 | -0.064445 | ... | 0.108852 | 0.150481 | 0.040437 | 0.039924 | 0.036516 | 0.037393 | 0.030791 | 0.032445 | 0.040917 | 0.036946 |
5 rows × 208 columns