Source code for ngs_toolkit.demo.data_generator

#!/usr/bin/env python

"""
A module dedicated to the generation of Analysis, Projects and their data.
"""

import os
import string
import tempfile

from ngs_toolkit.general import query_biomart
from ngs_toolkit.utils import location_index_to_bed
from ngs_toolkit.project_manager import create_project as init_proj
import numpy as np
import pandas as pd
import pybedtools
import yaml


REGION_BASED_DATA_TYPES = ['ATAC-seq', 'ChIP-seq']  # CNV technically is too but it does not need a `sites` attr
DEFAULT_CNV_RESOLUTIONS = ["1000kb", "100kb", "10kb"]


[docs]def generate_count_matrix( n_factors=1, n_replicates=4, n_features=1000, intercept_mean=4, intercept_std=2, coefficient_stds=0.4, size_factors=None, size_factors_std=0.1, dispersion_function=None, ): """ Generate count matrix for groups of samples by sampling from a negative binomial distribution. """ import patsy if isinstance(coefficient_stds, (int, float)): coefficient_stds = [coefficient_stds] * n_factors if dispersion_function is None: dispersion_function = _disp # Build sample vs factors table dcat = pd.DataFrame( patsy.demo_data( *(list(string.ascii_lowercase[:n_factors])))) dcat.columns = dcat.columns.str.upper() for col in dcat.columns: dcat[col] = dcat[col].str.upper() if n_replicates > 1: dcat = pd.concat( [dcat for _ in range(int(np.ceil(n_replicates / 2)))] ).sort_values(dcat.columns.tolist()).reset_index(drop=True) dcat.index = ["S{}_{}".format(str(i + 1).zfill(2), dcat.loc[i, :].sum()) for i in dcat.index] m_samples = dcat.shape[0] # make model design table design = np.asarray( patsy.dmatrix("~ 1 + " + " + ".join(string.ascii_uppercase[:n_factors]), dcat)) # get means beta = np.asarray( [np.random.normal(intercept_mean, intercept_std, n_features)] + [np.random.normal(0, std, n_features) for std in coefficient_stds]).T if size_factors is None: size_factors = np.random.normal(1, size_factors_std, (m_samples, 1)) mean = (2 ** (design @ beta.T) * size_factors).T # now sample counts dispersion = (1 / dispersion_function(2 ** (beta[:, 1:]))).mean(1).reshape(-1, 1) dnum = pd.DataFrame( np.random.negative_binomial( n=mean, p=dispersion, size=mean.shape), columns=dcat.index) dcat.index.name = dnum.columns.name = "sample_name" return dnum, dcat
[docs]def generate_data( n_factors=1, n_replicates=4, n_features=1000, coefficient_stds=0.4, data_type="ATAC-seq", genome_assembly="hg38", **kwargs): """ Creates real-looking data dependent on the data type. Parameters ---------- n_factors : :obj:`int`, optional Number of factors influencing variance between groups. For each factor there will be two groups of samples. Defaults to 1. n_replicates : :obj:`int`, optional Number of replicates per group. Defaults to 4. n_features : :obj:`int`, optional Number of features (i.e. genes, regions) in matrix. Defaults to 1000. coefficient_stds : {:obj:`int`, :obj:`list`}, optional Standard deviation of the coefficients between groups. If a list, must match the number of ``n_factors``. Defaults to 1. data_type : :obj:`bool`, optional Data type of the project. Must be one of the ``ngs_toolkit`` classes. Default is "ATAC-seq" genome_assembly : :obj:`bool`, optional Genome assembly of the project. Default is "hg38" **kwargs : :obj:`dict` Additional keyword arguments will be passed to :func:`ngs_toolkit.demo.data_generator.generate_count_matrix`. Returns ------- :obj:`tuple` A tuple of :class:`pandas.DataFrame` objects with numeric and categorical data respectively. """ dnum, dcat = generate_count_matrix( n_factors=n_factors, n_features=n_features, coefficient_stds=coefficient_stds, **kwargs) # add random location indexes if data_type in ["ATAC-seq", "ChIP-seq"]: dnum.index = get_random_genomic_locations( n_features, genome_assembly=genome_assembly ) elif data_type in ["RNA-seq"]: dnum.index = get_random_genes( n_features, genome_assembly=genome_assembly ) elif data_type in ["CNV"]: # choose last value of last factor as "background" factor = dcat.columns[-1] value = dcat[factor].unique()[-1] background_samples = dcat[factor] == value # get log2 ratios dnum += 1 background = dnum.loc[:, background_samples].median(1) signal = dnum.loc[:, ~background_samples].median(1) dnum = np.log2(dnum.T / background).T # now sort feature by group difference dnum = dnum.reindex((signal - background).sort_values().index) # and assign contiguous locations in the genome dnum_res = dict() for resolution in DEFAULT_CNV_RESOLUTIONS: x = dnum.copy() x.index = get_genomic_bins( n_features, genome_assembly=genome_assembly, resolution=resolution ) dnum_res[resolution] = x dnum = dnum_res elif data_type in ["CRISPR"]: dnum.index = get_random_grnas( n_features, genome_assembly=genome_assembly ) return dnum, dcat
[docs]def generate_project( output_dir=None, project_name="test_project", organism="human", genome_assembly="hg38", data_type="ATAC-seq", n_factors=1, only_metadata=False, sample_input_files=False, initialize=True, **kwargs): """ Creates a real-looking PEP-based project with respective input files and quantification matrix. Parameters ---------- output_dir : :obj:`str`, optional Directory to write files to. Defaults to a temporary location in the user's ``${TMPDIR}``. project_name : :obj:`bool`, optional Name for the project. Default is "test_project". organism : :obj:`bool`, optional Organism of the project. Default is "human" genome_assembly : :obj:`bool`, optional Genome assembly of the project. Default is "hg38" data_type : :obj:`bool`, optional Data type of the project. Must be one of the ``ngs_toolkit`` classes. Default is "ATAC-seq" only_metadata : obj:`bool`, optional Whether to only generate metadata for the project or input files in addition. Default is :obj:`False`. sample_input_files : obj:`bool`, optional Whether the input files for the respective data type should be produced. This would be BAM and peak files for ATAC-seq or BAM files for RNA-seq. Default is :obj:`True`. initialize : obj:`bool`, optional Whether the project should be initialized into an Analysis object for the respective ``data_type`` or simply return the path to a PEP configuration file. Default is :obj:`True`. **kwargs : :obj:`dict` Additional keyword arguments will be passed to :func:`ngs_toolkit.demo.data_generator.generate_data`. Returns ------- {:class:`ngs_toolkit.analysis.Analysis`, :obj:`str`} The Analysis object for the project or a path to its PEP configuration file. """ from ngs_toolkit import _LOGGER if output_dir is None: output_dir = tempfile.mkdtemp() # Create project with projectmanager init_proj( project_name, genome_assemblies={organism: genome_assembly}, overwrite=True, root_projects_dir=output_dir, ) # Generate random data dnum, dcat = generate_data( n_factors=n_factors, genome_assembly=genome_assembly, data_type=data_type, **kwargs) # add additional sample info dcat["protocol"] = data_type dcat["organism"] = organism # now save it dcat.to_csv( os.path.join(output_dir, project_name, "metadata", "annotation.csv")) # Make comparison table comp_table_file = os.path.join( output_dir, project_name, "metadata", "comparison_table.csv" ) ct = pd.DataFrame() factors = list(string.ascii_uppercase[: n_factors]) for factor in factors: for side, f in [(1, "2"), (0, "1")]: ct2 = dcat.query( "{} == '{}'".format(factor, factor + f) ).index.to_frame() ct2["comparison_side"] = side ct2["comparison_name"] = "Factor_" + factor + "_" + "2vs1" ct2["sample_group"] = "Factor_" + factor + f ct = ct.append(ct2) ct["comparison_type"] = "differential" ct["data_type"] = data_type ct["comparison_genome"] = genome_assembly ct.to_csv(comp_table_file, index=False) # add the sample_attributes and group_attributes depending on the number of factors config_file = os.path.join( output_dir, project_name, "metadata", "project_config.yaml") config = yaml.safe_load(open(config_file, "r")) factors = list(string.ascii_uppercase[: n_factors]) config["sample_attributes"] = ["sample_name"] + factors config["group_attributes"] = factors config["metadata"]["comparison_table"] = comp_table_file yaml.safe_dump(config, open(config_file, "w")) # prepare dirs dirs = [os.path.join(output_dir, project_name, "results")] for d in dirs: if not os.path.exists(d): os.makedirs(d) if not only_metadata: # Add consensus region set if data_type in REGION_BASED_DATA_TYPES: bed = location_index_to_bed(dnum.index) bed.to_csv( os.path.join( output_dir, project_name, "results", project_name + ".peak_set.bed" ), index=False, sep="\t", header=False, ) if isinstance(dnum, pd.DataFrame): dnum.to_csv( os.path.join( output_dir, project_name, "results", project_name + ".matrix_raw.csv" ) ) elif isinstance(dnum, dict): for res, d in dnum.items(): d.to_csv( os.path.join( output_dir, project_name, "results", project_name + "." + res + ".matrix_raw.csv" ) ) # Here we are raising the logger level to omit messages during # sample file creation which would have been too much clutter, # especially since I make use of `load_data` with permissive=True. # In dev, I still want to see them. # if DEV: # _LOGGER.debug("Shutting logger for now") prev_level = _LOGGER.getEffectiveLevel() _LOGGER.setLevel("ERROR") an = initialize_analysis_of_data_type(data_type, config_file) an.load_data(permissive=True) if sample_input_files: generate_sample_input_files(an, dnum) # if DEV: _LOGGER.setLevel(prev_level) # _LOGGER.debug("Reactivated logger") if not initialize: return config_file return an
[docs]def generate_projects( output_path=None, project_prefix_name="demo-project", data_types=["ATAC-seq", "ChIP-seq", "CNV", "RNA-seq"], organisms=["human", "mouse"], genome_assemblies=["hg38", "mm10"], n_factors=[1, 2, 5], n_features=[100, 1000, 10000], n_replicates=[1, 3, 5], **kwargs): """ Create a list of Projects given ranges of parameters, which will be passed to :func:`ngs_toolkit.demo.data_generator.generate_project`. """ ans = list() for data_type in data_types: for organism, genome_assembly in zip(organisms, genome_assemblies): for factors in n_factors: for features in n_features: for replicates in n_replicates: project_name = "_".join( str(x) for x in [ project_prefix_name, data_type, genome_assembly, factors, features, replicates]) an = generate_project( output_dir=output_path, project_name=project_name, organism=organism, genome_assembly=genome_assembly, data_type=data_type, n_factors=factors, n_replicates=replicates, n_features=features, **kwargs) ans.append(an) return ans
[docs]def generate_bam_file(count_vector, output_bam, genome_assembly="hg38", chrom_sizes_file=None, index=True): """Generate BAM file containing reads matching the counts in a vector of features""" s = location_index_to_bed(count_vector.index) # get reads per region i = [i for i, c in count_vector.iteritems() for _ in range(c)] s = s.reindex(i) # shorten/enlarge by a random fraction; name reads d = s['end'] - s['start'] s = s.assign( start=(s['start'] + d * np.random.uniform(-0.2, 0.2, s.shape[0])).astype(int), end=(s['end'] + d * np.random.uniform(-0.2, 0.2, s.shape[0])).astype(int), name=["{}_read_{}".format(count_vector.name, i) for i in range(s.shape[0])]) s = pybedtools.BedTool.from_dataframe(s).truncate_to_chrom(genome=genome_assembly).sort() # get a file with chromosome sizes (usually not needed but only for bedToBam) if chrom_sizes_file is None: chrom_sizes_file = tempfile.NamedTemporaryFile().name pybedtools.get_chromsizes_from_ucsc(genome=genome_assembly, saveas=chrom_sizes_file) s.to_bam(g=chrom_sizes_file).saveas(output_bam) if index: import pysam pysam.index(output_bam)
[docs]def generate_peak_file(peak_set, output_peak, genome_assembly="hg38", summits=False): """Generate peak files containing regions from a fraction of a given set of features""" if not isinstance(peak_set, pybedtools.BedTool): peak_set = pybedtools.BedTool(peak_set) s = peak_set.to_dataframe() # choose a random but non-empty fraction of sites to keep while True: s2 = s.sample(frac=np.random.uniform()) if not s2.empty: break s = pybedtools.BedTool.from_dataframe(s2) # shorten/enlarge sites by a random fraction s = s.slop( l=np.random.uniform(-0.2, 0.2), r=np.random.uniform(-0.2, 0.2), pct=True, genome=genome_assembly) if summits: # get middle basepair s = s.to_dataframe() mid = ((s['end'] - s['start']) / 2).astype(int) s.loc[:, 'start'] += mid s.loc[:, 'end'] -= mid s = pybedtools.BedTool.from_dataframe(s) s = s.sort() s.saveas(output_peak)
[docs]def generate_log2_profiles(sample_vector, background_vector, output_file): """Generate a file with read count profile of CNVs for the sample and background""" # raise NotImplementedError s = location_index_to_bed(sample_vector.index) s["log2." + sample_vector.name + ".trimmed.bowtie2.filtered.bam"] = sample_vector s["log2.background.trimmed.bowtie2.filtered.bam"] = background_vector s.index.name = "Feature" with open(output_file, 'w') as handle: handle.write("# This is a comment made by the CNV caller program\n") handle.write("# This is a another\n") s.to_csv(handle, sep="\t")
[docs]def generate_sample_input_files(analysis, matrix): """Generate input files (BAM, peaks) for a sample depending on its data type.""" if analysis.data_type in REGION_BASED_DATA_TYPES: chrom_sizes_file = tempfile.NamedTemporaryFile().name pybedtools.get_chromsizes_from_ucsc(genome=analysis.genome, saveas=chrom_sizes_file) if not hasattr(analysis, "sites"): analysis.load_data(only_these_keys=['sites'], permissive=True) if not hasattr(analysis, "sites"): raise AttributeError("Need a consensus peak set to generate sample input files.") for sample in analysis.samples: if hasattr(sample, "aligned_filtered_bam"): if sample.aligned_filtered_bam is not None: d = os.path.dirname(sample.aligned_filtered_bam) os.makedirs(d, exist_ok=True) generate_bam_file( matrix.loc[:, sample.name], sample.aligned_filtered_bam, genome_assembly=analysis.genome, chrom_sizes_file=chrom_sizes_file) if hasattr(sample, "peaks"): if sample.peaks is not None: d = os.path.dirname(sample.peaks) os.makedirs(d, exist_ok=True) generate_peak_file( analysis.sites, sample.peaks, summits=False, genome_assembly=analysis.genome) if hasattr(sample, "summits"): if sample.summits is not None: d = os.path.dirname(sample.summits) os.makedirs(d, exist_ok=True) generate_peak_file( analysis.sites, sample.summits, summits=True, genome_assembly=analysis.genome) if hasattr(sample, "log2_read_counts"): if sample.log2_read_counts is not None: for res, file in sample.log2_read_counts.items(): os.makedirs(os.path.dirname(file), exist_ok=True) generate_log2_profiles( (2 ** matrix[res].loc[:, sample.name]).astype(int), (2 ** matrix[res].loc[:, sample.name]).astype(int), # this should be the background vector file)
[docs]def initialize_analysis_of_data_type(data_type, pep_config, *args, **kwargs): """Initialize an Analysis object from a PEP config with the appropriate ``data_type``.""" from ngs_toolkit import Analysis sub = Analysis.__subclasses__() sub += [sc for x in sub for sc in x.__subclasses__()] m = {t._data_type: t for t in sub} return m[data_type](from_pep=pep_config, *args, **kwargs)
[docs]def get_random_genomic_locations( n_regions, width_mean=500, width_std=400, min_width=300, genome_assembly="hg38"): """Get `n_regions`` number of random genomic locations respecting the boundaries of the ``genome_assembly``""" from ngs_toolkit.utils import bed_to_index # weight chroms by their size, excluding others csizes = {k: v[-1] for k, v in dict(pybedtools.chromsizes(genome_assembly)).items() if "_" not in k} gsize = sum(csizes.values()) csizes = {k: v / gsize for k, v in csizes.items()} chrom = pd.Series(np.random.choice(a=list(csizes.keys()), size=n_regions, p=list(csizes.values()))) start = np.array([0] * n_regions) end = np.absolute(np.random.normal(width_mean, width_std, n_regions)).astype(int) df = pd.DataFrame([chrom.tolist(), start.tolist(), end.tolist()]).T df.loc[(df[2] - df[1]) < min_width, 2] += min_width bed = ( pybedtools.BedTool.from_dataframe(df) .shuffle(genome=genome_assembly, chromFirst=True, noOverlapping=True, chrom=True) .sort() .to_dataframe() ) return bed_to_index(bed)
[docs]def get_random_genes(n_genes, genome_assembly="hg38"): """Get ``n_genes`` number of random genes from the set of genes of the ``genome_assembly``""" m = {"hg19": "grch37", "hg38": "grch38", "mm10": "grcm38"} o = {"hg19": "hsapiens", "hg38": "hsapiens", "mm10": "mmusculus"} g = ( query_biomart( attributes=["external_gene_name"], ensembl_version=m[genome_assembly], species=o[genome_assembly], ) .squeeze() .drop_duplicates() ) return g.sample(n=n_genes, replace=False).sort_values()
[docs]def get_random_grnas(n_genes, genome_assembly="hg38", n_grnas_per_gene=4): """Get ``n_genes`` number of random genes from the set of genes of the ``genome_assembly``""" import itertools gs = int(n_genes / n_grnas_per_gene) genes = get_random_genes(gs, genome_assembly=genome_assembly) grna_ns = itertools.chain(*[range(n_grnas_per_gene) for _ in range(gs)]) return [ a + "__" + str(b) for a, b in zip(genes.repeat(n_grnas_per_gene), list(grna_ns))]
[docs]def get_genomic_bins(n_bins, genome_assembly="hg38", resolution=None): """Get a ``size`` number of random genomic bins respecting the boundaries of the ``genome_assembly``""" from ngs_toolkit.utils import bed_to_index bed = pybedtools.BedTool.from_dataframe( pd.DataFrame(dict(pybedtools.chromsizes(genome_assembly))).T.reset_index() ) w = bed.makewindows( genome=genome_assembly, w=sum([i.length for i in bed]) / n_bins ).to_dataframe() if resolution is not None: if isinstance(resolution, str): resolution = int(resolution.replace("kb", "000")) w["end"] = w["start"] + resolution return bed_to_index(w.head(n_bins))
def _disp(x): return 4 / x + .1