Source code for ngs_toolkit.utils

#!/usr/bin/env python

import os

import numpy as np
import pandas as pd

class Unbuffered(object):
    def __init__(self, stream): = stream

    def write(self, data):

    def writelines(self, datas):

    def __getattr__(self, attr):
        return getattr(, attr)

def have_unbuffered_output():
    import sys

    sys.stdout = Unbuffered(sys.stdout)

[docs]def is_running_inside_ipython(): """ Check whether code is running inside an IPython session. """ try: cfg = get_ipython().config if cfg['IPKernelApp']['parent_appname'] == 'ipython-notebook': return True else: return False except NameError: return False
def get_timestamp(fmt='%Y-%m-%d-%H:%M:%S'): from datetime import datetime return def remove_timestamp_if_existing(file): import re return re.sub(r"\d{4}-\d{2}-\d{2}-\d{2}:\d{2}:\d{2}\.", "", file) def get_this_file_or_timestamped(file, permissive=True): from glob import glob import re from ngs_toolkit.utils import sorted_nicely from ngs_toolkit import _LOGGER split = file.split(".") body = ".".join(split[:-1]) end = split[-1] res = sorted_nicely(glob(body + "*" + end)) res = [x for x in res if + r"\.\d{4}-\d{2}-\d{2}-\d{2}:\d{2}:\d{2}\.", x)] if len(res) == 0: return file else: try: # get newest file _LOGGER.warning( "Could not get uniquivocous timestamped file for '{}'.".format(file) + " Returning '{}'.".format(res[-1])) return res[-1] except IndexError: if permissive: return file else: msg = "Could not remove timestamp from file path." msg += " Probabably it does not exist." _LOGGER.debug(msg) raise IndexError(msg) def is_analysis_descendent(exclude_functions=[]): import inspect from ngs_toolkit import Analysis for s in inspect.stack(): if s.function in exclude_functions: return False if 'self' not in s.frame.f_locals: continue # # Get Analysis object if not isinstance(s.frame.f_locals['self'], Analysis): continue else: a = s.frame.f_locals['self'] break return "a" in locals()
[docs]def record_analysis_output(file_name, report=True, permissive=False): """ Register a file that is an output of the Analysis. The file will be associated with the function that produced it and saved in the attribute ``output_files``. Parameters ---------- file_name : :obj:`str` File name of analysis output to record. report : :obj:`bool` Whether to write an html report with all current records. Default is :obj:`True` Attributes ---------- output_files : :obj:`collections.OrderedDict OrderedDict with keys being the function that produced the file and a list of file(s) as values. Raises ---------- KeyError If function (or parents) that calls this is not part of an Analysis object. """ # TODO: separate stack workflow to get analysis to its own function import inspect from ngs_toolkit import Analysis, _LOGGER # Let's get the object that called the function previous to this one # # the use case is often: # # Analysis().do_work() <- do work will produce a plot and save it using savefig. # # If savefig(track=True), this function will be called and we can trace which Analysis object did so # Go up the stack until an Analysis object is found: msg = "`record_analysis_output` was called by a function not belonging to a Analysis object" for s in inspect.stack(): if 'self' not in s.frame.f_locals: continue # # Get Analysis object if not isinstance(s.frame.f_locals['self'], Analysis): continue else: a = s.frame.f_locals['self'] break if "a" not in locals(): if permissive: _LOGGER.debug(msg) return else: _LOGGER.error(msg) raise KeyError(msg) # # Get function name name = s.function # # Get parameters # loc.frame.f_locals a.record_output_file(file_name, name)
[docs]def submit_job( code, job_file, log_file=None, computing_configuration=None, dry_run=False, limited_number=False, total_job_lim=500, refresh_time=10, in_between_time=5, **kwargs): """ Submit a job to be run. Uses divvy to allow running on a local computer or distributed computing resources. Parameters ---------- code : :obj:`str` String of command(s) to be run. job_file : :obj:`str` File to write job ``code`` to. log_file : :obj:`str` Log file to write job output to. Defaults to `job_file` with ".log" ending. computing_configuration : :obj:`str` Name of `divvy` computing configuration to use. Defaults to 'default' which is to run job in localhost. dry_run: :obj:`bool` Whether not to actually run job. Defaults to False. limited_number: :obj:`bool` Whether to restrict jobs to a maximum number. Currently only possible if using "slurm". Defaults to False. total_job_lim : :obj:`int` Maximum number of jobs to restrict to. Defaults to 500. refresh_time : :obj:`int` Time in between checking number of jobs in seconds. Defaults to 10. in_between_time : :obj:`int` Time in between job submission in seconds. Defaults to 5. **kwargs : :obj:`dict` Additional keyword arguments will be passed to the chosen submission template according to `computing_configuration`. Pass for example: jobname="job", cores=2, mem=8000, partition="longq". """ import time import subprocess import divvy from ngs_toolkit import _CONFIG, _LOGGER # reduce level of logging from divvy # only for divvy <=0. if "logging" in divvy.__dict__.keys(): divvy.logging.getLogger("divvy").setLevel("ERROR") def count_jobs_running(check_cmd="squeue", sep="\n"): """ Count running jobs on a cluster by invoquing a command that lists the jobs. """ return subprocess.check_output(check_cmd).split(sep).__len__() def submit_job_if_possible(cmd, check_cmd="squeue", total_job_lim=800, refresh_time=10, in_between_time=5): submit = count_jobs_running(check_cmd) < total_job_lim while not submit: time.sleep(refresh_time) submit = count_jobs_running(check_cmd) < total_job_lim time.sleep(in_between_time) if log_file is None: log_file = ".".join(job_file.split(".")[:-1]) + ".log" # Get computing configuration from config if computing_configuration is None: try: computing_configuration = \ _CONFIG['preferences']['computing_configuration'] except KeyError: msg = "'computing_configuration' was not given" msg += " and default could not be get from config." hint = " Pass a value or add one to the section" hint += " preferences:computing_configuration'" hint += " in the ngs_toolkit config file." _LOGGER.error(msg + hint) raise dcc = divvy.ComputingConfiguration() if computing_configuration is not None: dcc.activate_package(computing_configuration) # Generate job script d = {"code": code, "logfile": log_file} d.update(kwargs) dcc.write_script(job_file, d) # Submit job if not dry_run: scmd = dcc['compute']['submission_command'] cmd = scmd.split(" ") + [job_file] # simply submit if not limiting submission to the number of already running jobs if not limited_number: else: # otherwise, submit only after `total_job_lim` is less than number of runnning jobs # this is only possible for slurm now though if scmd != "slurm": else: submit_job_if_possible(cmd, check_cmd="slurm")
[docs]def chunks(l, n): """ Partition iterable in chunks of size `n`. Parameters ---------- l : iterable Iterable (e.g. list or numpy array). n : :obj:`int` Size of chunks to generate. """ n = max(1, n) return list(l[i: i + n] for i in range(0, len(l), n))
[docs]def sorted_nicely(l): """ Sort an iterable in the way that humans expect. Parameters ---------- l : iterable Iterable to be sorted Returns ------- iterable Sorted iterable """ import re def convert(text): return int(text) if text.isdigit() else text def alphanum_key(key): return [convert(c) for c in re.split("([0-9]+)", key)] return sorted(l, key=alphanum_key)
[docs]def standard_score(x): """ Compute a standard score, defined as (x - min(x)) / (max(x) - min(x)). Parameters ---------- x : :class:`numpy.ndarray` Numeric array. Returns ------- :class:`numpy.ndarray` Transformed array. """ return (x - x.min()) / (x.max() - x.min())
[docs]def z_score(x): """ Compute a Z-score, defined as (x - mean(x)) / std(x). Parameters ---------- x : :class:`numpy.ndarray` Numeric array. Returns ------- :class:`numpy.ndarray` Transformed array. """ return (x - x.mean()) / x.std()
[docs]def logit(x): """ Compute the logit of x, defined as log(x / (1 - x)). Parameters ---------- x : :class:`numpy.ndarray` Numeric array. Returns ------- :class:`numpy.ndarray` Transformed array. """ return np.log(x / (1 - x))
[docs]def count_dataframe_values(x): """ Count number of non-null values in a dataframe. Parameters ---------- x : :class:`pandas.DataFrame` Pandas DataFrame Returns ------- int Number of non-null values. """ return np.multiply(*x.shape) - x.isnull().sum().sum()
[docs]def location_index_to_bed(index): """ Get a pandas DataFrame with columns "chrom", "start", "end" from an pandas Index of strings in form "chrom:start-end". Parameters ---------- index : :class:`pandas.Index` Index strings of the form "chrom:start-end". Returns ------- :class:`pandas.DataFrame` Pandas dataframe. """ bed = pd.DataFrame(index=index) index = index.to_series(name="region") bed["chrom"] = index.str.split(":").str[0] index2 = index.str.split(":").str[1] bed["start"] = index2.str.split("-").str[0] bed["end"] = index2.str.split("-").str[1] return bed
[docs]def bed_to_index(df): """ Get an index of the form chrom:start-end from a a dataframe with such columns. Parameters ---------- df : :class:`pandas.DataFrame` DataFrame with columns "chrom", "start" and "end". Returns ------- :class:`pandas.Index` Pandas index. """ cols = ["chrom", "start", "end"] if not all([x in df.columns for x in cols]): raise AttributeError( "DataFrame does not have '{}' columns.".format("', '".join(cols)) ) index = ( df["chrom"] + ":" + df["start"].astype(int).astype(str) + "-" + df["end"].astype(int).astype(str) ) return pd.Index(index, name="region")
[docs]def timedelta_to_years(x): """ Convert a timedelta to years. Parameters ---------- x : :class:`pandas.Timedelta` A Timedelta object. Returns ------- float Years. """ return x / np.timedelta64(60 * 60 * 24 * 365, "s")
[docs]def signed_max(x, f=0.66, axis=0): """ Return maximum or minimum of array `x` depending on the sign of the majority of values. If there isn't a clear majority (at least `f` fraction in one side), return mean of values. If given a pandas DataFrame or 2D numpy array, will apply this across rows (columns-wise, axis=0) or across columns (row-wise, axis=1). Will return NaN for non-numeric values. Parameters ---------- x : {:class:`numpy.ndarray`, :class:`pandas.DataFrame`, :class:`pandas.Series`} Input values to reduce f : :obj:`float` Threshold fraction of majority agreement. Default is 0.66. axis : :obj:`int` Whether to apply across rows (0, column-wise) or across columns (1, row-wise). Default is 0. Returns ------- :class:`pandas.Series` Pandas Series with values reduced to the signed maximum. """ if axis not in [0, 1]: raise ValueError("Axis must be one of 0 (columns) or 1 (rows).") if len(x.shape) == 1: # Return nan if not numeric if x.dtype not in [np.float_, np.int_]: return np.nan types = [type(i) for i in x] if not all(types): return np.nan else: if types[0] not in [np.float_, float, np.int_, int]: return np.nan ll = float(len(x)) neg = sum(x < 0) pos = sum(x > 0) obs_f = max(neg / ll, pos / ll) if obs_f >= f: if neg > pos: return min(x) else: return max(x) else: return np.mean(x) else: if not isinstance(x, pd.DataFrame): x = pd.DataFrame(x) if axis == 1: x = x.T res = pd.Series(np.empty(x.shape[1]), index=x.columns) for v in x.columns: res[v] = signed_max(x.loc[:, v]) return res
[docs]def log_pvalues(x, f=0.1): """ Calculate -log10(p-value) replacing infinite values with: ``max(x) + max(x) * f`` (`f` % more than the maximum) Parameters ---------- x : pandas.Series Series with numeric values f : float Fraction to augment the maximum value by if ``x`` contains infinite values. Defaults to 0.1. Returns ------- :class:`pandas.Series` Transformed values """ ll = -np.log10(x) rmax = ll[ll != np.inf].max() return ll.replace(np.inf, rmax + rmax * f)
[docs]def collect_md5_sums(df): """ Given a dataframe with columns with paths to md5sum files ending in '_md5sum', replace the paths to the md5sum files with the actual checksums. Useful to use in combination with :func:`~ngs_toolkit.general.project_to_geo`. Parameters ---------- df : :class:`pandas.DataFrame` A dataframe with columns ending in '_md5sum'. Returns ------- :class:`pandas.DataFrame` DataFrame with md5sum columns replaced with the actual md5sums. """ cols = df.columns[df.columns.str.endswith("_md5sum")] for col in cols: for i, path in df.loc[:, col].iteritems(): if not pd.isnull(path): cont = open(path, "r").read().strip() if any([x.isspace() for x in cont]): cont = cont.split(" ")[0] df.loc[i, col] = cont return df
[docs]def decompress_file(file, output_file=None): """ Decompress a gzip-compressed file out-of-memory. """ """ # test: file = "file.bed.gz" """ import shutil import gzip from ngs_toolkit import _LOGGER if output_file is None: if not file.endswith(".gz"): msg = "`output_file` not given and input_file does not end in '.gz'." _LOGGER.error(msg) raise ValueError(msg) output_file = file.replace(".gz", "") # decompress with, "rb") as _in: with open(output_file, "wb") as _out: shutil.copyfileobj(_in, _out)
[docs]def compress_file(file, output_file=None): """ Compress a gzip-compressed file out-of-memory. """ """ # test: file = "file.bed.gz" """ import shutil import gzip if output_file is None: output_file = file + ".gz" # compress with open(file, "rb") as _in: with, "wb") as _out: shutil.copyfileobj(_in, _out)
[docs]def download_file(url, output_file, chunk_size=1024): """ Download a file and write to disk in chunks (not in memory). Parameters ---------- url : :obj:`str` URL to download from. output_file : :obj:`str` Path to file as output. chunk_size : :obj:`int` Size in bytes of chunk to write to disk at a time. """ """ # test: url = '' url += '/ChmmModels/coreMarks/jointModel/final/E001_15_coreMarks_dense.bed.gz' output_file = "file.bed.gz" chunk_size = 1024 download_file(url, output_file, chunk_size) """ import requests response = requests.get(url, stream=True) with open(output_file, "wb") as outfile: outfile.writelines(response.iter_content(chunk_size=chunk_size))
def download_gzip_file(url, output_file): if not output_file.endswith(".gz"): output_file += ".gz" download_file(url, output_file) decompress_file(output_file) if os.path.exists(output_file) and os.path.exists(output_file.replace(".gz", "")): os.remove(output_file)
[docs]def series_matrix2csv(matrix_url, prefix=None): """ matrix_url: gziped URL with GEO series matrix. """ import gzip import subprocess"wget {}".format(matrix_url).split(" ")) filename = matrix_url.split("/")[-1] with, "rb") as f: file_content = # separate lines with only one field (project-related) # from lines with >2 fields (sample-related) prj_lines = dict() sample_lines = dict() for line in file_content.decode("utf-8").strip().split("\n"): line = line.strip().split("\t") if len(line) == 2: prj_lines[line[0].replace('"', "")] = line[1].replace('"', "") elif len(line) > 2: sample_lines[line[0].replace('"', "")] = [ x.replace('"', "") for x in line[1:] ] prj = pd.Series(prj_lines) prj.index = prj.index.str.replace("!Series_", "") samples = pd.DataFrame(sample_lines) samples.columns = samples.columns.str.replace("!Sample_", "") if prefix is not None: prj.to_csv(os.path.join(prefix + ".project_annotation.csv"), index=True) samples.to_csv(os.path.join(prefix + ".sample_annotation.csv"), index=False) return prj, samples
[docs]def deseq_results_to_bed_file( deseq_result_file, bed_file, sort=True, ascending=False, normalize=False, significant_only=False, alpha=0.05, abs_fold_change=1.0, ): """ Write BED file with fold changes from DESeq2 as score value. """ from ngs_toolkit import _LOGGER from sklearn.preprocessing import MinMaxScaler df = pd.read_csv(deseq_result_file, index_col=0) msg = "DESeq2 results do not have a 'log2FoldChange' column." if not ("log2FoldChange" in df.columns.tolist()): _LOGGER.error(msg) raise AssertionError(msg) if sort is True: df = df.sort_values("log2FoldChange", ascending=ascending) if significant_only is True: df = df.loc[ (df["padj"] < alpha) & (df["log2FoldChange"].abs() > abs_fold_change), : ] # decompose index string (chrom:start-end) into columns df["chrom"] = map(lambda x: x[0], df.index.str.split(":")) r = pd.Series(map(lambda x: x[1], df.index.str.split(":"))) df["start"] = map(lambda x: x[0], r.str.split("-")) df["end"] = map(lambda x: x[1], r.str.split("-")) df["name"] = df.index if normalize: MinMaxScaler(feature_range=(0, 1000)).fit_transform(df["log2FoldChange"]) df["score"] = df["log2FoldChange"] df[["chrom", "start", "end", "name", "score"]].to_csv( bed_file, sep="\t", header=False, index=False )
[docs]def homer_peaks_to_bed(homer_peaks, output_bed): """ Convert HOMER peak calls to BED format. The fifth column (score) is the -log10(p-value) of the peak. Parameters ---------- homer_peaks : :obj:`str` HOMER output with peak calls. output_bed : :obj:`str` Output BED file. """ df = pd.read_csv(homer_peaks, sep="\t", comment="#", header=None) df["-log_pvalue"] = (-np.log10(df.iloc[:, -2])).replace(, 1000) df["name"] = df[1] + ":" + df[2].astype(str) + "-" + df[3].astype(str) ( df[[1, 2, 3, "name", "-log_pvalue"]] .sort_values([1, 2, 3], ascending=True) .to_csv(output_bed, header=False, index=False, sep="\t") )
[docs]def macs2_call_chipseq_peak( signal_samples, control_samples, output_dir, name, distributed=True ): """ Call ChIP-seq peaks with MACS2 in a slurm job. Parameters ---------- signal_samples : :obj:`list` Signal Sample objects. control_samples : :obj:`list` Background Sample objects. output_dir : :obj:`list` Parent directory where MACS2 outputs will be stored. name : :obj:`str` Name of the MACS2 comparison being performed. distributed: :obj:`bool` Whether to submit a SLURM job or to return a string with the runnable. """ output_path = os.path.join(output_dir, name) if not os.path.exists(output_path): os.mkdir(output_path) runnable = """date\nmacs2 callpeak -t {0} -c {1} -n {2} --outdir {3}\ndate""".format( " ".join([s.aligned_filtered_bam for s in signal_samples]), " ".join([s.aligned_filtered_bam for s in control_samples]), name, output_path, ) if distributed: job_name = "macs2_{}".format(name) job_file = os.path.join(output_path, name + "") submit_job(runnable, job_file, cores=4, jobname=job_name) else: return runnable
[docs]def homer_call_chipseq_peak_job( signal_samples, control_samples, output_dir, name, distributed=True ): """ Call ChIP-seq peaks with MACS2 in a slurm job. Parameters ---------- signal_samples : :obj:`list` Signal Sample objects. control_samples : :obj:`list` Background Sample objects. output_dir : :obj:`list` Parent directory where MACS2 outputs will be stored. name : :obj:`str` Name of the MACS2 comparison being performed. """ output_path = os.path.join(output_dir, name) if not os.path.exists(output_path): os.mkdir(output_path) # make tag directory for the signal and background samples separately signal_tag_directory = os.path.join(output_dir, "homer_tag_dir_" + name + "_signal") fs = " ".join([s.aligned_filtered_bam for s in signal_samples]) runnable = """date\nmakeTagDirectory {0} {1}\n""".format(signal_tag_directory, fs) background_tag_directory = os.path.join( output_dir, "homer_tag_dir_" + name + "_background" ) fs = " ".join([s.aligned_filtered_bam for s in control_samples]) runnable += """makeTagDirectory {0} {1}\n""".format(background_tag_directory, fs) # call peaks output_file = os.path.join( output_dir, name, name + "_homer_peaks.factor.narrowPeak" ) runnable += """findPeaks {signal} -style factor -o {output_file} -i {background}\n""".format( output_file=output_file, background=background_tag_directory, signal=signal_tag_directory, ) output_file = os.path.join( output_dir, name, name + "_homer_peaks.histone.narrowPeak" ) runnable += """findPeaks {signal} -style histone -o {output_file} -i {background}\ndate\n""".format( output_file=output_file, background=background_tag_directory, signal=signal_tag_directory, ) if distributed: job_name = "homer_findPeaks_{}".format(name) job_file = os.path.join(output_path, name + "") submit_job(runnable, job_file, cores=4, jobname=job_name) else: return runnable
[docs]def bed_to_fasta(input_bed, output_fasta, genome_file): """ Retrieves DNA sequence underlying specific region. Names of FASTA entries will be of form "chr:start-end". Parameters ---------- input_bed : :obj:`str` Path to input BED file. output_fasta : :obj:`str` Path to resulting FASTA file. genome_file : :obj:`str` Path to genome file in either 2bit or FASTA format. Will be guessed based on file ending. Raises ---------- ValueError If `genome_file` format cannot be guessed or is not supported. """ if genome_file.endswith(".2bit"): bed_to_fasta_through_2bit(input_bed, output_fasta, genome_file) elif genome_file.endswith(".fa") or genome_file.endswith(".fasta"): bed_to_fasta_through_fasta(input_bed, output_fasta, genome_file) else: msg = "Format of `genome_file` must be one of FASTA or 2bit, " msg += "with file ending in either '.fa', '.fasta' or '.2bit'" raise ValueError(msg)
[docs]def bed_to_fasta_through_2bit(input_bed, output_fasta, genome_2bit): """ Retrieves DNA sequence underlying specific region. Requires the `twoBitToFa` command from UCSC kent tools. Names of FASTA entries will be of form "chr:start-end". Parameters ---------- input_bed : :obj:`str` Path to input BED file. output_fasta : :obj:`str` Path to resulting FASTA file. genome_2bit : :obj:`str` Path to genome 2bit file. """ import subprocess tmp_bed = input_bed + ".tmp.bed" # write name column bed = pd.read_csv(input_bed, sep="\t", header=None) bed["name"] = bed[0] + ":" + bed[1].astype(str) + "-" + bed[2].astype(str) bed[1] = bed[1].astype(int) bed[2] = bed[2].astype(int) bed.to_csv(tmp_bed, sep="\t", header=None, index=False) cmd = "twoBitToFa {0} -bed={1} {2}".format(genome_2bit, tmp_bed, output_fasta)" ")) os.remove(tmp_bed)
[docs]def bed_to_fasta_through_fasta(input_bed, output_fasta, genome_fasta): """ Retrieves DNA sequence underlying specific region. Uses bedtools getfasta (internally through pybedtools.BedTool.sequence). Names of FASTA entries will be of form "chr:start-end". Parameters ---------- input_bed : :obj:`str` Path to input BED file. output_fasta : :obj:`str` Path to resulting FASTA file. genome_fasta : :obj:`str` Path to genome FASTA file. """ import pybedtools bed = pd.read_csv(input_bed, sep="\t", header=None) bed["name"] = bed[0] + ":" + bed[1].astype(str) + "-" + bed[2].astype(str) bed[1] = bed[1].astype(int) bed[2] = bed[2].astype(int) bed = pybedtools.BedTool.from_dataframe(bed.iloc[:, [0, 1, 2]]) bed.sequence(fi=genome_fasta, fo=output_fasta, name=True)
[docs]def count_reads_in_intervals(bam, intervals): """ Count total number of reads in a iterable holding strings representing genomic intervals of the form ``"chrom:start-end"``. Parameters ---------- bam : :obj:`str` Path to BAM file. intervals : :obj:`list` List of strings with genomic coordinates in format ``"chrom:start-end"``. Returns ------- :obj:`dict` Dict of read counts for each interval. """ import pysam counts = dict() bam = pysam.AlignmentFile(bam, mode="rb") chroms = ["chr" + str(x) for x in range(1, 23)] + ["chrX", "chrY"] for interval in intervals: if interval.split(":")[0] not in chroms: continue counts[interval] = bam.count(region=interval.split("|")[0]) bam.close() return counts
[docs]def normalize_quantiles_r(array): """ Quantile normalization with a R implementation. Requires the "rpy2" library and the R library "preprocessCore". Requires the R package "cqn" to be installed: >>> source('') >>> biocLite('preprocessCore') Parameters ---------- array : :class:`numpy.ndarray` Numeric array to normalize. Returns ------- :class:`numpy.ndarray` Normalized numeric array. """ import rpy2.robjects as robjects import rpy2.robjects.numpy2ri import warnings from rpy2.rinterface import RRuntimeWarning warnings.filterwarnings("ignore", category=RRuntimeWarning) rpy2.robjects.numpy2ri.activate() robjects.r('require("preprocessCore")') normq = robjects.r("normalize.quantiles") return np.array(normq(array))
[docs]def normalize_quantiles_p(df_input): """ Quantile normalization with a ure Python implementation. Code from Parameters ---------- df_input : :class:`pandas.DataFrame` Dataframe to normalize. Returns ------- :class:`numpy.ndarray` Normalized numeric array. """ df = df_input.copy() # compute rank dic = {} for col in df: dic.update({col: sorted(df[col])}) sorted_df = pd.DataFrame(dic) rank = sorted_df.mean(axis=1).tolist() # sort for col in df: t = np.searchsorted(np.sort(df[col]), df[col]) df[col] = [rank[i] for i in t] return df
def count_bam_file_length(bam_file): import pysam return pysam.AlignmentFile(bam_file).count() def count_lines(file): with open(file) as f: for i, _ in enumerate(f): pass return i + 1 def get_total_region_area(bed_file): peaks = pd.read_csv(bed_file, sep="\t", header=None) return (peaks.iloc[:, 2] - peaks.iloc[:, 1]).sum() def get_region_lengths(bed_file): peaks = pd.read_csv(bed_file, sep="\t", header=None) return peaks.iloc[:, 2] - peaks.iloc[:, 1] def get_regions_per_chromosomes(bed_file): peaks = pd.read_csv(bed_file, sep="\t", header=None) return peaks.iloc[:, 0].value_counts()