#!/usr/bin/env python
import os
import numpy as np
import pandas as pd
from ngs_toolkit import _LOGGER
from ngs_toolkit.analysis import Analysis
from ngs_toolkit.decorators import check_has_attributes
[docs]class CNVAnalysis(Analysis):
"""
Class to model analysis of CNV data.
Inherits from the :class:`~ngs_toolkit.analysis.Analysis` class.
Parameters
----------
name : :obj:`str`, optional
Name of the analysis.
Defaults to "analysis".
from_pep : :obj:`str`, optional
PEP configuration file to initialize analysis from.
The analysis will adopt as much attributes from the PEP as possible
but keyword arguments passed at initialization will still have priority.
Defaults to :obj:`None` (no PEP used).
from_pickle : :obj:`str`, optional
Pickle file of an existing serialized analysis object
from which the analysis should be loaded.
Defaults to :obj:`None` (will not load from pickle).
root_dir : :obj:`str`, optional
Base directory for the project.
Defaults to current directory or to what is specified in PEP if :attr:`~ngs_toolkit.analysis.Analysis.from_pep`.
data_dir : :obj:`str`, optional
Directory containing processed data (e.g. by looper) that will
be input to the analysis. This is in principle not required.
Defaults to "data".
results_dir : :obj:`str`, optional
Directory to contain outputs produced by the analysis.
Defaults to "results".
prj : :class:`peppy.Project`, optional
A :class:`peppy.Project` object that this analysis is tied to.
Defaults to :obj:`None`.
samples : :obj:`list`, optional
List of :class:`peppy.Sample` objects that this analysis is tied to.
Defaults to :obj:`None`.
kwargs : :obj:`dict`, optional
Additional keyword arguments will be passed to parent class :class:`~ngs_toolkit.analysis.Analysis`.
Examples
--------
>>> from ngs_toolkit.cnv import CNVAnalysis
This is an example of a CNV analysis:
>>> pep = "metadata/project_config.yaml"
>>> a = CNVAnalysis(from_pep=pep)
>>> # Get consensus peak set from all samples
>>> a.get_cnv_data()
>>> # Normalize
>>> a.normalize(method="median")
>>> # Segmentation
>>> a.segment_genome()
>>> # General plots
>>> a.plot_all_data()
>>> a.plot_segmentation_stats()
>>> # Unsupervised analysis
>>> a.unsupervised_analysis()
>>> # Save object
>>> a.to_pickle()
"""
_data_type = "CNV"
def __init__(
self,
name=None,
from_pep=False,
from_pickle=False,
root_dir=None,
data_dir="data",
results_dir="results",
prj=None,
samples=None,
**kwargs
):
# The check for existance is to make sure other classes can inherit from this
default_args = {
"data_type": "CNV",
"__data_type__": "CNV",
"var_unit_name": "bin",
"quantity": "copy_number",
"norm_units": "log2(ratio)"}
for k, v in default_args.items():
if not hasattr(self, k):
setattr(self, k, v)
super(CNVAnalysis, self).__init__(
name=name,
from_pep=from_pep,
from_pickle=from_pickle,
root_dir=root_dir,
data_dir=data_dir,
results_dir=results_dir,
prj=prj,
samples=samples,
**kwargs
)
if not hasattr(self, "resolutions"):
self.resolutions = (["1000kb", "100kb", "20kb", "10kb"],)
[docs] def load_data(
self,
output_map=None,
only_these_keys=None,
resolutions=None,
prefix="{results_dir}/{name}",
permissive=True,
):
"""
Load the output files of the major functions of the Analysis.
Parameters
----------
output_map : :obj:`dict`
Dictionary with {attribute_name: (file_path, kwargs)} to load the files.
The kwargs in the tuple will be passed to :meth:`pandas.read_csv`.
Defaults to the required to read the keys in ``only_these_keys``.
only_these_keys : :obj:`list`, optional
Iterable of analysis attributes to load up.
Possible attributes:
* "matrix_raw"
* "matrix_norm"
* "matrix_features"
* "differential_results"
Defaults to all of the above.
resolutions: :obj:`list`
List of resolution strings to get data for.
Defaults to value of ``resolutions`` attribute of Analysis.
prefix : :obj:`str`, optional
String prefix of files to load.
Variables in curly braces will be formated with attributes of analysis.
Defaults to "{results_dir}/{name}".
permissive : :obj:`bool`, optional
Whether an error should be ignored if reading a file causes IOError.
Default is :obj:`True`.
Attributes
----------
pandas.DataFrame
Dataframes holding the respective data, available as attributes described
in the `only_these_keys` parameter.
Raises
----------
IOError
If not permissive and a file is not found
"""
from ngs_toolkit.utils import fix_dataframe_header
prefix = self._format_string_with_attributes(prefix)
if output_map is None:
kwargs = {"index_col": 0}
output_map = {
"matrix_raw": (
os.path.join(self.results_dir, self.name + ".{}.matrix_raw.csv"),
kwargs,
),
"matrix_norm": (
os.path.join(self.results_dir, self.name + ".{}.matrix_norm.csv"),
kwargs,
),
"segmentation": (
os.path.join(self.results_dir, self.name + ".{}.segmentation.csv"),
{},
),
"segmentation_annot": (
os.path.join(
self.results_dir, self.name + ".{}.segmentation.annotated.csv"
),
{},
),
}
if only_these_keys is None:
only_these_keys = list(output_map.keys())
output_map = {k: v for k, v in output_map.items() if k in only_these_keys}
if resolutions is None:
resolutions = self.resolutions
for resolution in resolutions:
for name, (file, kwargs) in output_map.items():
file = file.format(resolution)
_LOGGER.info("Loading '{}' analysis attribute.".format(name))
if not hasattr(self, name):
setattr(self, name, {})
try:
getattr(self, name)[resolution] = pd.read_csv(file, **kwargs)
# Fix possible multiindex for matrix_norm
if name == "matrix_norm":
getattr(self, name)[resolution] = fix_dataframe_header(
getattr(self, name)[resolution]
)
except IOError as e:
if not permissive:
raise e
else:
_LOGGER.warning(e)
def _copy_cnv_profile_plots(
self, output_dir="{results_dir}/cnv_profiles",
output_prefix="log2_profile",
resolutions=None, samples=None, permissive=False
):
"""
Load CNV data from ATAC-seq CNV pipeline and create CNV matrix at various resolutions.
Parameters
----------
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to True
assign: :obj:`bool`, optional
Whether results should be assigned to an attribute in the Analsyis object.
Defaults to True
permissive: :obj:`bool`, optional
Whether missing files should be allowed.
Defaults to False
"""
from tqdm import tqdm
from glob import glob
from shutil import copyfile
if resolutions is None:
resolutions = self.resolutions
if samples is None:
samples = self.samples
output_dir = self._format_string_with_attributes(output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for resolution in tqdm(resolutions, total=len(resolutions), desc="Resolution"):
for sample in tqdm(samples, total=len(samples), desc="Sample"):
# Read log2 file
if not hasattr(sample, "log2_read_counts"):
sample.log2_read_counts = os.path.join(
self.data_dir,
sample.paths.sample_root, sample.name + "_{resolution}",
"CNAprofiles",
"log2_read_counts.igv")
if "{resolution}" in sample.log2_read_counts:
input_file = sample.log2_read_counts.format(resolution=resolution)
f = glob(input_file)
if len(f) == 1:
f = f[0]
else:
msg = "Sample '{}' does not have a PDF file!".format(sample.name)
if permissive:
_LOGGER.warning(msg)
continue
else:
raise OSError(msg)
d = os.path.join(output_dir, sample.name + "." + resolution + "." + output_prefix + ".pdf")
try:
copyfile(f, d)
except OSError:
msg = "Could not copy file '{}' to '{}'!".format(f, d)
if permissive:
_LOGGER.warning(msg)
else:
raise OSError(msg)
[docs] def get_cnv_data(
self, resolutions=None, samples=None, save=True, assign=True, permissive=False
):
"""
Load CNV data from ATAC-seq CNV pipeline and create CNV matrix at various resolutions.
Parameters
----------
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to True
assign: :obj:`bool`, optional
Whether results should be assigned to an attribute in the Analsyis object.
Defaults to True
permissive: :obj:`bool`, optional
Whether missing files should be allowed.
Defaults to False
Returns
-------
dict
Dictionary with CNV matrices for each resolution.
Raises
-------
IOError
If not permissive and input files can't be read.
Attributes
----------
matrix : :obj:`dict`
Sets a `matrix` dictionary with CNV matrices for each resolution.
"""
# TODO: figure out a way of having the input file specified before hand
from tqdm import tqdm
if resolutions is None:
resolutions = self.resolutions
if samples is None:
samples = self.samples
matrix_raw = dict()
for resolution in tqdm(resolutions, total=len(resolutions), desc="Resolution"):
matrix_raw[resolution] = pd.DataFrame()
for sample in tqdm(samples, total=len(samples), desc="Sample"):
# Read log2 file
if not hasattr(sample, "log2_read_counts"):
sample.log2_read_counts = os.path.join(
self.data_dir,
sample.paths.sample_root, sample.name + "_{resolution}",
"CNAprofiles",
"log2_read_counts.igv")
if "{resolution}" in sample.log2_read_counts:
input_file = sample.log2_read_counts.format(resolution=resolution)
try:
cov = pd.read_csv(
input_file, sep="\t", comment="#"
).set_index("Feature")
except IOError as e:
e = "Sample {} does not have a 'log2_read_counts' file: '{}'.".format(
sample.name, input_file
)
if permissive:
print(e)
continue
else:
raise IOError(e)
cov.columns = (
cov.columns.str.replace("log2.", "")
.str.replace(".trimmed.bowtie2.filtered.bam", "")
.str.replace(".merged.sorted.subsample.bam", "")
)
# normalize signal to control
matrix_raw[resolution][sample.name] = np.log2(
(
(0.1 + (2 ** cov.loc[:, sample.name]))
/ (0.1 + (2 ** cov.iloc[:, -1]))
)
)
if "cov" not in locals():
msg = "None of the samples had a valid 'log2_read_counts' file."
_LOGGER.error(msg)
raise ValueError(msg)
matrix_raw[resolution].index = (
cov["Chromosome"]
+ ":"
+ cov["Start"].astype(int).astype(str)
+ "-"
+ cov["End"].astype(int).astype(str)
)
matrix_raw[resolution].index.name = "index"
if save:
matrix_raw[resolution].to_csv(
os.path.join(
self.results_dir,
self.name + ".{}.matrix_raw.csv".format(resolution),
),
index=True,
)
if assign:
self.matrix_raw = matrix_raw
return matrix_raw
[docs] def normalize(
self,
method="median",
matrix="matrix_raw",
samples=None,
save=True,
assign=True,
**kwargs
):
"""
Normalization of dictionary of matrices with (n_features, n_samples).
Parameters
----------
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
method : :obj:`str`
Normalization method to apply.
Defaults to "median".
matrix : :obj:`str`, optional
Attribute name of dictionary of matrices to normalize.
Defaults to `matrix_raw`.
samples : :obj:`list`
Iterable of peppy.Sample objects to restrict matrix to.
Default is all in analysis.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to True
assign: :obj:`bool`, optional
Whether results should be assigned to an attribute in the Analsyis object.
Defaults to True
kwargs : :obj:`dict`, optional
Additional kwargs are passed to the respective normalization method.
Returns
-------
dict
Dictionary with normalized CNV matrices for each resolution.
Attributes
----------
matrix_norm : :obj:`dict`
Sets a `matrix_norm` dictionary with CNV matrices for each resolution.
"""
matrix_norm = dict()
if "resolutions" not in kwargs:
resolutions = self.resolutions
else:
resolutions = kwargs['resolutions']
if matrix is None:
matrix = self.matrix_raw
elif isinstance(matrix, str):
matrix = getattr(self, matrix)
for resolution in resolutions:
if method == "median":
matrix_norm[resolution] = self.normalize_median(
matrix=matrix[resolution], samples=samples, save=False, assign=False
)
elif method == "pca":
if "pc" not in kwargs:
raise ValueError(
"If method is 'pca', the value of 'pc' must be given."
)
matrix_norm[resolution] = self.normalize_pca(
matrix=matrix[resolution],
samples=samples,
pc=kwargs["pc"],
save=False,
assign=False,
)
else:
raise ValueError("Requested method '{}' is not known.".format(method))
if save:
matrix_norm[resolution].to_csv(
os.path.join(
self.results_dir, self.name + ".{}.matrix_norm.csv".format(resolution)
),
index=True,
)
if assign:
self.matrix_norm = matrix_norm
return matrix_norm
[docs] def plot_all_data(
self,
matrix="matrix_norm",
resolutions=None,
samples=None,
output_dir=None,
output_prefix="{analysis_name}.all_data",
robust=True,
vmin=None,
vmax=None,
rasterized=True,
dpi=300,
sample_labels=True,
):
"""
Visualize CNV data genome-wide using heatmaps.
Will be done independently for each specified resolution.
Parameters
----------
matrix : :obj:`str`, optional
Attribute name of dictionary of matrices to normalize.
Defaults to `matrix_norm`.
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
output_dir : :obj:`str`, optional
Output directory.
Defaults to Analysis results directory.
output_prefix : :obj:`str`, optional
Prefix to add to plots.
Defaults to "{analysis_name}.all_data"
robust: :obj:`bool`, optional
Whether to scale the color scale robustly (to quantiles rather than extremes).
Defaults to True
vmin : float, optional
Minimum value of color scale.
vmax : float, optional
Maximum value of color scale.
Defaults to None
rasterized: :obj:`bool`, optional
Whether to rasterize main heatmap.
Defaults to True
dpi : :obj:`int`, optional
DPI resolution of rasterized image.
Defaults to 300
sample_labels: :obj:`bool`, optional
Whether to label samples with their name.
Defaults to True
"""
# TODO: add support for group colours
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from ngs_toolkit.graphics import savefig
matrix = self.get_matrix(matrix)
if resolutions is None:
resolutions = self.resolutions
if samples is None:
samples = self.samples
if output_dir is None:
output_dir = self.results_dir
if "{analysis_name}" in output_prefix:
output_prefix = output_prefix.format(analysis_name=self.name)
names = [s.name for s in samples]
# Plot mean and variationper chromosome
for resolution in tqdm(resolutions, desc="Resolution"):
r_names = [n for n in names if n in matrix[resolution].columns]
# Plot all data
fig, axis = plt.subplots(1, 1, figsize=(4 * 2, 4 * 1))
sns.heatmap(
matrix[resolution].T,
cmap="RdBu_r",
center=0,
robust=robust,
ax=axis,
vmin=vmin,
vmax=vmax,
rasterized=rasterized,
xticklabels=False,
yticklabels=r_names if sample_labels else False,
cbar_kws={"label": "log2(change)"},
)
axis.set_xlabel("Chromosome position")
axis.set_ylabel("Sample")
savefig(
fig,
os.path.join(
self.results_dir,
self.name
+ ".{}.{}.full_data.heatmap.svg".format(resolution, output_prefix)))
grid = sns.clustermap(
matrix[resolution].fillna(0).T,
metric="correlation",
cmap="RdBu_r",
center=0,
col_cluster=False,
robust=robust,
vmin=vmin,
vmax=vmax,
rasterized=rasterized,
xticklabels=False,
yticklabels=r_names if sample_labels else False,
cbar_kws={"label": "log2(change)"},
figsize=(4 * 2, 4 * 1),
)
grid.ax_heatmap.set_xlabel("Chromosome position")
grid.ax_heatmap.set_ylabel("Sample")
savefig(
grid.fig,
os.path.join(
self.results_dir,
self.name
+ ".{}.{}.full_data.fillna.clustermap.svg".format(
resolution, output_prefix
),
))
[docs] def plot_stats_per_chromosome(
self,
matrix="matrix_norm",
resolutions=None,
samples=None,
output_dir="{results_dir}",
output_prefix="{analysis_name}.all_data",
robust=True,
rasterized=True,
dpi=300,
sample_labels=True,
):
"""
Visualize mean and variation of CNV data for each chromosome using heatmaps.
Will be done independently for each specified resolution.
Will also be done once for all chromosomes and another time without sex chromosomes.
Parameters
----------
matrix : :obj:`str`, optional
Attribute name of dictionary of matrices to normalize.
Defaults to `matrix_norm`.
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
output_dir : :obj:`str`, optional
Output directory.
Defaults to Analysis results directory.
output_prefix : :obj:`str`, optional
Prefix to add to plots.
Defaults to "{analysis_name}.all_data"
robust: :obj:`bool`, optional
Whether to scale the color scale robustly (to quantiles rather than extremes).
Defaults to True
rasterized: :obj:`bool`, optional
Whether to rasterize main heatmap.
Defaults to True
dpi : :obj:`int`, optional
DPI resolution of rasterized image.
Defaults to 300
sample_labels: :obj:`bool`, optional
Whether to label samples with their name.
Defaults to True
"""
import seaborn as sns
from tqdm import tqdm
from ngs_toolkit.graphics import (
clustermap_fix_label_orientation, savefig)
matrix = self.get_matrix(matrix)
if resolutions is None:
resolutions = self.resolutions
if samples is None:
samples = self.samples
output_dir = self._format_string_with_attributes(output_dir)
if "{analysis_name}" in output_prefix:
output_prefix = output_prefix.format(analysis_name=self.name)
# Plot mean and variationper chromosome
for resolution in tqdm(resolutions, desc="Resolution"):
for label, function in tqdm(
[("variation", np.std), ("mean", np.mean)], desc="metric"
):
to_plot = matrix[resolution].copy()
names = [s.name for s in samples if s.name in to_plot.columns]
to_plot = to_plot.loc[:, names]
to_plot["chr"] = map(lambda x: x[0], to_plot.index.str.split(":"))
p = to_plot[names + ["chr"]].groupby("chr").apply(function)
grid = sns.clustermap(
p + p.min().min(),
cbar_kws={"label": label},
cmap="Greens",
xticklabels=sample_labels,
yticklabels=True,
rasterized=rasterized)
clustermap_fix_label_orientation(grid)
savefig(
grid.fig,
os.path.join(
output_dir,
self.name
+ ".{}.".format(resolution)
+ output_prefix
+ ".{}_per_chrom.svg".format(label)))
p = (
to_plot.loc[~to_plot["chr"].str.contains("X|Y"), names + ["chr"]]
.groupby("chr")
.apply(function))
kwargs = {
"xticklabels": sample_labels,
"yticklabels": True,
"rasterized": rasterized,
"robust": robust}
grid = sns.clustermap(
p + abs(p.min().min()),
cbar_kws={"label": label},
cmap="Greens", **kwargs)
clustermap_fix_label_orientation(grid)
savefig(
grid.fig,
os.path.join(
output_dir,
self.name
+ ".{}.".format(resolution)
+ output_prefix
+ ".{}_per_chrom.no_sex_chroms.svg".format(label)))
grid = sns.clustermap(
p,
cbar_kws={"label": label + " (Z-score)"},
cmap="RdBu_r",
center=0,
z_score=1, **kwargs)
clustermap_fix_label_orientation(grid)
savefig(
grid.fig,
os.path.join(
output_dir,
self.name
+ ".{}.".format(resolution)
+ output_prefix
+ ".{}_per_chrom.no_sex_chroms.zscore.svg".format(label)))
[docs] def segment_genome(
self,
matrix="matrix_norm",
resolutions=None,
samples=None,
save=True,
assign=True,
):
"""
Segment CNV data to create calls of significant deviations.
Will be done independently for each specified resolution.
Requires the R package "DNAcopy" to be installed:
>>> source('http://bioconductor.org/biocLite.R')
>>> biocLite('DNAcopy')
Parameters
----------
matrix : :obj:`str`, optional
Attribute name of dictionary of matrices to segment.
Defaults to `matrix_norm`.
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to True
assign: :obj:`bool`, optional
Whether results should be assigned to an attribute in the Analsyis object.
Defaults to True
Returns
-------
dict
Dictionary with segmentation for each resolution.
Attributes
----------
segmentation : :obj:`dict`
Dictionary with CNV matrices for each resolution.
"""
# TODO: implement distributed mode
import warnings
from tqdm import tqdm
import rpy2
from rpy2.rinterface import RRuntimeWarning
from rpy2.robjects import numpy2ri, pandas2ri
from rpy2.robjects.packages import STAP
warnings.filterwarnings("ignore", category=RRuntimeWarning)
numpy2ri.activate()
pandas2ri.activate()
DNAcopy = rpy2.robjects.packages.importr("DNAcopy")
if matrix is None:
matrix = self.matrix_norm
elif isinstance(matrix, str):
matrix = getattr(self, matrix)
if resolutions is None:
resolutions = self.resolutions
if samples is None:
samples = self.samples
segmentation = dict()
for resolution in tqdm(resolutions, desc="Resolution"):
chrom = np.array(
list(map(lambda x: x[0], matrix[resolution].index.str.split(":")))
)
start = np.array(
list(map(
lambda x: int(x[1].split("-")[0]),
matrix[resolution].index.str.split(":"),
))
)
names = [s.name for s in samples if s.name in matrix[resolution].columns]
df = matrix[resolution].reindex(names, axis=1)
code = """
run = function(df, chrom, start, names){
return(
DNAcopy::segment(
DNAcopy::CNA(
df, chrom=chrom, maploc=start, sampleid=names)))
}
"""
run = STAP(code, name="run").run
res = run(df, chrom, start, names)
seg = DNAcopy.segments_p(res)
summary = DNAcopy.segments_summary(res)
segmentation[resolution] = seg.merge(summary)
segmentation[resolution].columns = [
"sample_name",
"chrom",
"start",
"end",
"bin_size",
"segment_mean",
"B_stat",
"p_value",
"lcl",
"ucl",
"segment_std",
"segment_median",
"segment_mad",
]
segmentation[resolution]["segment_length"] = (
segmentation[resolution]["end"] - segmentation[resolution]["start"]
) + int(resolution.replace("kb", "")) * 1000
if save:
segmentation[resolution].to_csv(
os.path.join(
self.results_dir,
self.name + ".{}.segmentation.csv".format(resolution),
),
index=False,
)
if assign:
self.segmentation = segmentation
return segmentation
[docs] @check_has_attributes(['organism', 'genome'])
def annotate_with_chrom_bands(
self, segmentation=None, resolutions=None, save=True, assign=True
):
"""
Annotate segmentation with chromosome bands and overlaping genes.
Will be done independently for each specified resolution.
Parameters
----------
segmentation : :obj:`str`, optional
Attribute name of dictionary of segmentation results.
Defaults to `segmentation`.
resolutions : :obj:`list`, optional
Resolutions of analysis.
Defaults to resolutions in Analysis object.
samples : :obj:`list`, optional
Samples to restrict analysis to.
Defaults to samples in Analysis object.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to True
assign: :obj:`bool`, optional
Whether results should be assigned to an attribute in the Analsyis object.
Defaults to True
Returns
-------
dict
Dictionary with annotated segmentation for each resolution.
Attributes
----------
segmentation_annot : :obj:`dict`
Dictionary with CNV matrices for each resolution.
"""
from ngs_toolkit.general import query_biomart
import pybedtools
if resolutions is None:
resolutions = self.resolutions
if segmentation is None:
segmentation = self.segmentation
organisms = {
"human": {"species": "hsapiens", "ensembl_version": "grch37"},
"mouse": {"species": "mmusculus", "ensembl_version": "grcm38"},
"yeast": {"species": "scerevisiae", "ensembl_version": "R64"}}
annotation = query_biomart(
attributes=[
"chromosome_name",
"start_position",
"end_position",
"external_gene_name",
"band"],
species=organisms[self.organism]["species"],
ensembl_version=organisms[self.organism]["ensembl_version"])
annotation["chromosome_name"] = "chr" + annotation["chromosome_name"]
annotation = annotation[~annotation["chromosome_name"].str.contains("_")]
annotation["band"] = annotation["chromosome_name"] + "_" + annotation["band"]
annotation_bed = pybedtools.BedTool.from_dataframe(annotation.fillna("NA"))
segmentation_annot = dict()
for resolution in resolutions:
seg = segmentation[resolution]
seg_bed = pybedtools.BedTool.from_dataframe(seg[["chrom", "start", "end"]])
inter = seg_bed.intersect(annotation_bed, wa=True, wb=True).to_dataframe()
annot = inter.groupby(["chrom", "start", "end"])["thickStart"].apply(
lambda x: str.join(' ', set(x))
)
annot_band = inter.groupby(["chrom", "start", "end"])["thickEnd"].apply(
lambda x: str.join(' ', set(x))
)
annot_band.name = "chromosome_band"
annot = annot.to_frame(name="gene").join(annot_band)
seg_annot = (
seg.set_index(["chrom", "start", "end"]).join(annot).reset_index()
)
segmentation_annot[resolution] = seg_annot.reindex(
seg.columns.tolist() + ["chromosome_band", "gene"], axis=1
)
if save:
segmentation_annot[resolution].to_csv(
os.path.join(
self.results_dir,
self.name + ".{}.segmentation.annotated.csv".format(resolution),
),
index=False,
)
if assign:
self.segmentation_annot = segmentation_annot
[docs] def plot_segmentation_stats(
self,
segmentation=None,
resolutions=None,
per_sample=False,
output_dir="{results_dir}/segmentation",
output_prefix="{resolution}.segmentation_metrics",
):
"""
Visualize distribution of statistics of CNV data segmentation.
Will be done independently for each specified resolution.
Parameters
----------
segmentation : :obj:`str`, optional
Dictionary of segmentation results.
Defaults to `segmentation`.
resolutions : :obj:`list`, optional
Resolutions of analysis. Defaults to resolutions in Analysis object.
per_sample: :obj:`bool`, optional
Whether plots should be made for each sample too.
Defaults to False
output_dir : :obj:`str`, optional
Output directory.
output_prefix : :obj:`str`, optional
Prefix to add to plots.
Defaults to "{resolution}.segmentation_metrics"
"""
# TODO: plot stats per sample too
from ngs_toolkit.utils import log_p_value
import seaborn as sns
from ngs_toolkit.graphics import savefig
if segmentation is None:
segmentation = self.segmentation
if resolutions is None:
resolutions = self.resolutions
output_dir = self._format_string_with_attributes(output_dir)
metric_vars = [
"bin_size",
"segment_mean",
"B_stat",
"p_value",
"lcl",
"ucl",
"segment_std",
"segment_median",
"segment_mad",
"segment_length",
]
for resolution in resolutions:
segments = segmentation[resolution]
segments["log_p_value"] = log_p_value(segments["p_value"])
grid = sns.pairplot(
segments[metric_vars + ["log_p_value"]].dropna(),
vars=metric_vars + ["log_p_value"],
plot_kws=dict(s=3, alpha=0.2, linewidth=0, rasterized=True),
diag_kws=dict(rasterized=True),
)
if "{resolution}" in output_prefix:
op = output_prefix.format(resolution=resolution)
savefig(
grid.fig,
os.path.join(output_dir, op + ".all_samples.svg"))
# if per_sample:
[docs]def all_to_igv(matrix, output_prefix, **kwargs):
"""
Convert dictionary of DataFrame with CNV data in several resolutions to IGV format.
Parameters
----------
matrix : :obj:`pandas.DataFrame`
DataFrame with CNV data to convert.
output_prefix : str
Prefix to add to plots.
**kwargs : :obj:`dict`, optional
Additional parameters will be passed to ngs_toolkit.cnv.to_igv
Returns
-------
dict
Dictionary of CNV data in IGV format for each resolution.
"""
from tqdm import tqdm
igvs = dict()
resolutions = matrix.keys()
for resolution in tqdm(resolutions, total=len(resolutions), desc="Resolution"):
_LOGGER.info("Making IGV visualization for resolution '{}'.".format(resolution))
igvs[resolution] = to_igv(
matrix[resolution],
output_file="{}_{}.igv".format(output_prefix, resolution),
**kwargs
)
return igvs
[docs]def to_igv(matrix, output_file=None, save=True, view_limits=(-2, 2)):
"""
Convert DataFrame with CNV data to IGV format.
Parameters
----------
matrix : :obj:`pandas.DataFrame`
DataFrame with CNV data to convert.
output_file : str, optional
Output file.
Required if `save` is True.
save: :obj:`bool`, optional
Whether results should be saved to disc.
Defaults to :obj:`True`.
view_limits : tuple, optional
Extreme values (min, max) of color scale used to visualize in IGV.
Defaults to (-2, 2).
Returns
-------
pandas.DataFrame
CNV data in IGV format.
Raises
-------
ValueError:
If `save` is True but `output_file` is None.
"""
_LOGGER.info("Making IGV visualization")
# as IGV file
igv = pd.DataFrame(index=matrix.index)
igv.loc[:, "Chromosome"] = list(map(lambda x: x[0], matrix.index.str.split(":")))
igv.loc[:, "Start"] = list(map(lambda x: int(x[1].split("-")[0]), matrix.index.str.split(":")))
igv.loc[:, "End"] = list(map(lambda x: int(x[1].split("-")[1]), matrix.index.str.split(":")))
igv.loc[:, "Name"] = igv.index
igv = igv.join(matrix).reset_index(drop=True)
if save:
if output_file is None:
raise ValueError(
"If the 'save' option is specified, 'output_file' must also be!"
)
open(output_file, "w")
output_handle = open(output_file, "a")
output_handle.write(
"#track viewLimits=-{}:{} graphType=heatmap color=255,0,0\n".format(
*view_limits
)
)
igv.to_csv(output_handle, sep="\t", index=False)
output_handle.close()
return igv