Source code for gunc.get_scores

#!/usr/bin/env python3
import os
import sys
import scipy
import scipy.stats
import numpy as np
import pandas as pd
from collections import OrderedDict
from pkg_resources import resource_filename
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)


def read_diamond_output(file_path):
    """Read in Diamond output.

    Args:
        file_path (str): Full path to diamond output.

    Returns:
        pandas.DataFrame: diamond output
    """
    df = pd.read_csv(
        file_path,
        sep="\t",
        header=None,
        usecols=[0, 1, 2],
        names=["query", "genome", "id"],
    )
    if len(df) == 0:
        return df
    else:
        df["contig"] = df["query"].str.rpartition("_")[[0]]
        return df


[docs] def get_n_effective_surplus_clades(counts): """Calculate Inverse Simpson Index. Inverse Simpson Index of all clade labels distribution -1 (as 1 genome is expected) Arguments: counts (pandas.Series): Counts of taxons in a taxlevel. Returns: float: Score describing the extent of chimerism, i.e. the effective number of surplus clades represented at a taxlevel. """ if len(counts) == 0: return 0 else: denom = sum(counts) ** 2 return 1 / sum([x**2 / denom for x in counts]) - 1
[docs] def expected_entropy_estimate(probabilities, sample_count): """Compute the expected entropy estimate sampling N elements underlying probabilities `p` Arguments: probabilities (numpy.ndarray): probabilities (p) (assumed to sum to 1.0) sample_count (int): number of samples (N) Returns: entropy (float): expected entropy """ entropy = 0.0 for probability in probabilities: sample_array = np.arange(1, sample_count) entropy += np.sum( scipy.special.comb(sample_count, sample_array) * np.power(probability, sample_array) * np.power(1 - probability, sample_count - sample_array) # should be sample_array/sample_count # but we moved the 1/sample_count out: * sample_array * np.log(sample_array / sample_count) ) return -entropy / sample_count
[docs] def calc_expected_conditional_entropy(contigs, taxons): """Compute the expected measured conditional entropy under the null hypothesis that there is no relationship between contig membership and taxonomy When the bucket is large enough, the estimates are expected to be close enough to the global estimate that we will no longer compute the estimate via the more costly `expected_entropy_estimate` function Arguments: contigs (pandas.Series): contig names in data taxons (pandas.Series): taxons in data Returns: float: expected measured conditional entropy """ MAX_BUCKET_SIZE = 500 counts = taxons.value_counts() taxon_entropy = scipy.stats.entropy(counts.values) if taxon_entropy == 0: return 0.0 bucket_sizes = contigs.value_counts().value_counts() nr_elements = bucket_sizes * bucket_sizes.index contribution = nr_elements / nr_elements.sum() taxon_probability = counts / counts.sum() taxon_probability = taxon_probability.values total_entropy = 0.0 for bucket_size, contig_count in contribution.items(): if bucket_size <= MAX_BUCKET_SIZE: total_entropy += contig_count * expected_entropy_estimate( taxon_probability, bucket_size ) else: total_entropy += contig_count * taxon_entropy return total_entropy
def read_genome2taxonomy_reference(db): """Read in genome2taxonomy reference data. This file translates the genomes in the diamond reference file to taxonomy with seven taxonomic levels from kingdom to species. Arguments: db (str): Which db to use: progenomes or gtdb Returns: pandas.DataFrame: genome2taxonomy reference """ if db == "progenomes_2.1": genome2taxonomy = resource_filename(__name__, "data/genome2taxonomy_ref.tsv") elif db == "gtdb_95": genome2taxonomy = resource_filename( __name__, "data/genome2taxonomy_gtdbref.tsv" ) else: sys.exit(f"[ERROR] {db} unknown. Allowed: progenomes_2.1, gtdb_95") return pd.read_csv(genome2taxonomy, sep="\t") def create_base_data(diamond_df, db): """Assign taxonomies to diamond output. Arguments: diamond_df (pandas.DataFrame): diamond output db (str): Which db to use: progenomes or gtdb Returns: pandas.DataFrame: merged dataframe """ genome2taxonomy_df = read_genome2taxonomy_reference(db) return pd.merge(diamond_df, genome2taxonomy_df, on="genome", how="inner") def get_stats(diamond_df): """Get summary statistics of diamond mapping output. Arguments: diamond_df (pandas.DataFrame): diamond output Returns: tuple: (number of genes mapped to GUNC reference DB, number of contigs containing mapped genes) """ if len(diamond_df) == 0: return (0, 0) genes_mapped = len(diamond_df) contig_count = diamond_df["contig"].nunique() return (genes_mapped, contig_count)
[docs] def get_abundant_lineages_cutoff(sensitive, genes_mapped): """Determine cutoff for abundant lineages. Removal of all genes coming from clades consisting of <2% of all mapped genes is intended to reduce noise introduced by genes mapping to a wide range of clades due to their poor representation in the reference. In sensitive mode that value is reduced to just 10 genes. Arguments: sensitive (bool): sets cutoff to 10 if true genes_mapped (int): total number of genes mapped by diamond to GUNC DB Returns: float: cutoff used to determine an abundant lineage """ if sensitive: return 10 else: return 0.02 * genes_mapped
[docs] def calc_contamination_portion(counts): """Calculate contamination portion Arguments: counts (pandas.Series): Counts of taxons in a taxlevel. Returns: float: portion of genes assigning to all clades except the one with most genes. """ return 1 - counts.max() / counts.sum()
def mean(list_of_numbers): """Get mean of a list of numbers. Arguments: list_of_numbers (list): numbers to get mean of Returns: float: mean of numbers """ return sum(list_of_numbers) / len(list_of_numbers)
[docs] def calc_mean_hit_identity(identity_scores): """Calculate mean hit identity score. Calculates the mean identity with which genes in abundant lineages (>2%) hit genes in the reference. Arguments: identity_scores (list): list of identity scores Returns: float: the mean of identity scores """ if len(identity_scores) == 0: return 0 return mean(identity_scores) / 100
[docs] def calc_conditional_entropy(contigs, taxons): """Compute conditional entropy Arguments: contigs (pandas.Series): IDs of contigs taxons (pandas.Series): IDs of taxonomic clade assignments Returns: float: measured conditional entropy """ cross = pd.crosstab(contigs, taxons) MI = scipy.stats.entropy( cross.values.ravel(), np.outer(cross.sum(1), cross.sum()).ravel() ) H_t = scipy.stats.entropy(taxons.value_counts()) return H_t - MI
[docs] def calc_clade_separation_score( contamination_portion, conditional_entropy, expected_conditional_entropy ): """Get clade separation score (CSS). CSS = 0, if contamination_portion = 0 or H(T|C) <= H(T|R) CSS = 1 - H(T|C)/H(T|R), else Arguments: contamination_portion (float): GUNC contamination portion, when equal to 0 means all genes map to the same taxonomic clade. conditional_entropy (float): H(T|C), the entropy of taxonomic clade labels given their contig assignment. expected_conditional_entropy (float): H(T|R), the expected value of H(T|C) given identical contig size distribution and given there is no relationship between taxonomic clade and contig labels. Returns: float: GUNC CSS """ if contamination_portion == 0: return 0 elif contamination_portion == np.nan: return np.nan elif expected_conditional_entropy == 0: return np.nan else: return ( 1 - conditional_entropy / expected_conditional_entropy if conditional_entropy <= expected_conditional_entropy else 0 )
[docs] def determine_adjustment(genes_retained_index): """Determine if adjustment is necessary. Adjustment of GUNC CSS score is done by setting it to 0 when there are <40% of all called genes retained in abundant lineages/clades representing >2% of all mapped genes. Arguments: genes_retained_index (float): proportion of all called genes retained in abundant lineages (>2%). Returns: int: 1 or 0. If 1, keeps CSS as it was. If 0, sets CSS to zero. """ if genes_retained_index > 0.4: return 1 else: return 0
[docs] def is_chimeric(clade_separation_score_adjusted): """Determine if chimeric. The cutoff of 0.45 was identified using benchmarks and is used to call a genome chimeric/contaminated if CSS is higher than this cutoff. Arguments: clade_separation_score_adjusted (float): score Returns: bool: is genome chimeric """ if clade_separation_score_adjusted > 0.45: return True else: return False
[docs] def get_scores_for_taxlevel( base_data, tax_level, abundant_lineages_cutoff, genome_name, genes_called, genes_mapped, contig_count, min_mapped_genes, ): """Run chimerism check. Calculates the various scores needed to determine if genome ic chimeric. Arguments: base_data (pandas.DataFrame): Diamond output merged with taxonomy table tax_level (str): tax level to run abundant_lineages_cutoff (float): Cutoff value for abundant lineages genome_name (str): Name of input genome genes_called (int): Number of genes called by prodigal and used by diamond for mapping to GUNC DB genes_mapped (int): Number of genes mapped to GUNC DB by diamond contig_count (int): Count of contigs min_mapped_genes (int): Minimum number of mapped genes at which to calculate scores Returns: OrderedDict: scores for chosen taxlevel """ tax_data = base_data.groupby(tax_level).filter( lambda x: len(x) > abundant_lineages_cutoff ) if len(tax_data) < min_mapped_genes: return OrderedDict( { "genome": genome_name, "n_genes_called": genes_called, "n_genes_mapped": genes_mapped, "n_contigs": contig_count, "taxonomic_level": tax_level, "proportion_genes_retained_in_major_clades": np.nan, "genes_retained_index": np.nan, "clade_separation_score": np.nan, "contamination_portion": np.nan, "n_effective_surplus_clades": np.nan, "mean_hit_identity": np.nan, "reference_representation_score": np.nan, "pass.GUNC": np.nan, } ) counts = tax_data[tax_level].value_counts() contigs = tax_data["contig"] taxons = tax_data[tax_level] n_effective_surplus_clades = get_n_effective_surplus_clades(counts) contamination_portion = calc_contamination_portion(counts) mean_hit_identity = calc_mean_hit_identity(tax_data["id"].tolist()) expected_conditional_entropy = calc_expected_conditional_entropy(contigs, taxons) conditional_entropy = calc_conditional_entropy(contigs, taxons) clade_separation_score = calc_clade_separation_score( contamination_portion, conditional_entropy, expected_conditional_entropy ) portion_genes_retained = len(tax_data) / genes_mapped genes_retained_index = genes_mapped / genes_called * portion_genes_retained reference_representation_score = genes_retained_index * mean_hit_identity adjustment = determine_adjustment(genes_retained_index) clade_separation_score_adjusted = clade_separation_score * adjustment chimeric = is_chimeric(clade_separation_score_adjusted) passGUNC = not chimeric return OrderedDict( { "genome": genome_name, "n_genes_called": genes_called, "n_genes_mapped": genes_mapped, "n_contigs": contig_count, "taxonomic_level": tax_level, "proportion_genes_retained_in_major_clades": portion_genes_retained, "genes_retained_index": genes_retained_index, "clade_separation_score": clade_separation_score_adjusted, "contamination_portion": contamination_portion, "n_effective_surplus_clades": n_effective_surplus_clades, "mean_hit_identity": mean_hit_identity, "reference_representation_score": reference_representation_score, "pass.GUNC": passGUNC, } )
[docs] def chim_score( diamond_file_path, genes_called=0, sensitive=False, min_mapped_genes=11, use_species_level=False, db="progenomes_2.1", plot=False, ): """Get chimerism scores for a genome. Arguments: diamond_file_path (str): Full path to diamond output Keyword Arguments: genes_called (int): sensitive (bool): Run in sensitive mode (default: (False)) min_mapped_genes (int): Minimum number of mapped genes at which to calculate scores (default: (11) use_species_level (bool): Allow species level to be selected for maxCSS (default: (False)) plot (bool): Return data needed for plotting (default: (False)) db (str): Which db to use: progenomes or gtdb (default: (progenomes) Returns: pandas.DataFrame: GUNC scores """ diamond_df = read_diamond_output(diamond_file_path) base_data = create_base_data(diamond_df, db) genes_mapped, contig_count = get_stats(diamond_df) genome_name = os.path.basename(diamond_file_path).split(".diamond.")[0] abundant_lineages_cutoff = get_abundant_lineages_cutoff(sensitive, genes_mapped) if plot: return base_data, genome_name, abundant_lineages_cutoff scores = [] for tax_level in [ "kingdom", "phylum", "class", "order", "family", "genus", "species", ]: scores.append( get_scores_for_taxlevel( base_data, tax_level, abundant_lineages_cutoff, genome_name, genes_called, genes_mapped, contig_count, min_mapped_genes, ) ) df = pd.DataFrame(scores).round(2) df["pass.GUNC"] = df["pass.GUNC"].astype(str) if df["clade_separation_score"].isnull().all(): max_CSSidx = 0 elif use_species_level: max_CSSidx = df["clade_separation_score"].idxmax() else: max_CSSidx = df[:-1]["clade_separation_score"].idxmax() max_CSS = df.iloc[[0] if pd.isna(max_CSSidx) else [max_CSSidx]] return df, max_CSS