Source code for forward.genotype

# This file is part of forward.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.

from __future__ import division

"""
This module provides utilities to handle genotype data.
"""

import os
import logging
logger = logging.getLogger(__name__)

from gepyto.formats.impute2 import Impute2File
import numpy as np
import pandas as pd
from sqlalchemy import Column, String, Integer, Float
from sqlalchemy.ext.hybrid import hybrid_property

from . import SQLAlchemyBase
from .utils import abstract, dispatch_methods, expand

try:  # pragma: no cover
    import pyplink
    HAS_PYPLINK = True
except ImportError:
    HAS_PYPLINK = False


__all__ = ["MemoryImpute2Geno", "PlinkGenotypeDatabase"]


class FrozenDatabaseError(Exception):
    def __str__(self):
        return ("Once initialized, genotype databases are immutable. Further "
                "filtering needs to be done at the Task level.")


[docs]class Variant(SQLAlchemyBase): """ORM Variant object. +----------------+------------------------------------------+------------+ | Column | Description | Type | +================+==========================================+============+ | name | The variant's name (`e.g.` rs12356) | String(25) | +----------------+------------------------------------------+------------+ | chrom | The chromosome (`e.g.` '3' or 'X') | String(15) | +----------------+------------------------------------------+------------+ | pos | The variant's position on the chromosome | Integer | +----------------+------------------------------------------+------------+ | mac | Minor allele count (`e.g.` 125) | Integer | +----------------+------------------------------------------+------------+ | minor | Least common allele (`e.g.` 'T') | String(10) | +----------------+------------------------------------------+------------+ | major | Most common allele (`e.g.` 'C') | String(10) | +----------------+------------------------------------------+------------+ | n_missing | Number of missing genotypes for this | Integer | | | variant | | +----------------+------------------------------------------+------------+ | n_non_missing | Number of non-missing genotypes for this | Integer | | | variant | | +----------------+------------------------------------------+------------+ Computed fields: - ``maf``: mac / (2 :math:`\cdot` n non missing) - ``completion_rate``: n non missing / (n non missing + n missing) """ __tablename__ = "variants" name = Column(String(25), primary_key=True) chrom = Column(String(15)) pos = Column(Integer) mac = Column(Float) # Minor allele count can be approximated using dosage. minor = Column(String(10)) major = Column(String(10)) n_missing = Column(Integer) n_non_missing = Column(Integer) # The maf = mac / (2 * n_non_missing) @hybrid_property def maf(self): return self.mac / (2 * self.n_non_missing) # The completion rate = n_non_missing / (n_non_missing + n_missing) @hybrid_property def completion_rate(self): return self.n_non_missing / (self.n_non_missing + self.n_missing)
@abstract
[docs]class AbstractGenotypeDatabase(object): """Abstract genotype container. This class defines the standardized methods to organize the genotype access procedures used by Forward. """ def __init__(self, **kwargs): dispatch_methods(self, kwargs) # Sample management interface.
[docs] def get_sample_order(self): """Return a list of the (ordered) samples as represented in the database. The experiment will pass the results of this method to the phenotype container's ``set_sample_order`` method. """ return self.samples
[docs] def query_variants(self, session, fields=None): """Return a query object for variants. :param session: A session object to interface with the Variant table. :type session: :py:class:`sqlalchemy.orm.session.Session` :param fields: A list of attributes to query. They should correspond to columns of the :py:class:`Variant` table. :type fields: list If fields are given, they are queried. Alternatively a query for the Variant objects is returned. Variant data is stored in a SQLAlchemy database. This method provides a shortcut to query it in a more pythonic way. """ if fields: args = [] if not type(fields) in (tuple, list): fields = [fields] for field in fields: if hasattr(Variant, field): args.append(getattr(Variant, field)) else: raise ValueError( "No column {} for Variants.".format(field) ) return session.query(*args) return session.query(Variant) # Get a numpy vector of variants.
[docs] def get_genotypes(self, variant_name): """Get a vector of encoded genotypes for the variant. This is the core functionality of the Genotype Databases. It should be as fast as possible as it will be called repeatedly by the tasks. If the structure is in memory, using a hashmap or a pandas DataFrame is recommended. If the underlying structure is on disk, this should use very good indexing and potentially caching. """ raise NotImplementedError() # Experiment initalization including filling up the database and filtering # variants.
[docs] def experiment_init(self, experiment): """Experiment specific initialization. This method has two main roles. Building a database of :py:class:`Variant` objects and doing the db-level filtering of the variants. It should also take care of loading the file in memory or of indexing if needed. """ # Create the tables corresponding to the SQLAlchemyBase subclasses. Variant.__table__.create(experiment.engine, checkfirst=True) # Filtering methods.
[docs] def filter_name(self, variant_list): """Filtering by variant id. The argument can be either a path to a file or a list of names. """ raise NotImplementedError()
def filter_maf(self, maf): raise NotImplementedError() def filter_completion(self, rate): raise NotImplementedError() # Static utilities. @staticmethod
[docs] def load_samples(filename): """Read a list of samples from a single column file.""" logger.info("Loading samples from {}".format(filename)) with open(filename, "r") as f: samples = [i.rstrip() for i in f] return np.array(samples, dtype=str)
[docs]class MemoryImpute2Geno(AbstractGenotypeDatabase): """Container for small(ish) IMPUTE2 files. :param filename: The filename for the IMPUTE2 file. :type filename: str :param samples: A list containing a single column and no header. The rows are the ordered sample IDs. :type sample: str :param filter_probability: A cutoff for imputation probability. Only genotypes with an imputation probability above this threshold will be used for the analysis. :type filter_probability: float .. warning:: This implementation load the whole file in memory (hence the name). Be careful and make sure that you have enough RAM to hold everything. It would be fairly easy to subclass this and support lazily reading genotype data from the disk. Feel free to contribute this feature if you need it. """ def __init__(self, filename, samples, filter_probability=0, **kwargs): self.filename = expand(filename) self.samples = self.load_samples(expand(samples)) self.impute2file = Impute2File(self.filename, "dosage", prob_threshold=filter_probability) # Filters (init). self.thresh_completion = 0 self.thresh_maf = 0 self.names = set() self.samples_mask = None # The initial filtering is done when the file is parsed. We won't allow # the user to apply the filtering methods again. self._frozen = False super(MemoryImpute2Geno, self).__init__(**kwargs)
[docs] def experiment_init(self, experiment, batch_insert_n=100000): """Experiment specific initialization. This takes care of initializing the database and filtering variants. It is automatically called by the Experiment. """ # Call the parent constructor (this will create the db). super(MemoryImpute2Geno, self).experiment_init(experiment) names = [] # Used to set the index. self._mat = [] # The actual dosage vectors. db_buffer = [] # List of dicts to bulk insert in the database. num_inserts = 0 con = experiment.engine.connect() # We also get a connection object. for dosage, info in self.impute2file: name = info["name"] # Go through the filters. # Name if self.names and name not in self.names: continue # Completion n_missing = np.sum(np.isnan(dosage)) n_non_missing = dosage.shape[0] - n_missing if self.thresh_completion != 0: completion = n_non_missing / dosage.shape[0] if completion < self.thresh_completion: continue # MAF if info["maf"] < self.thresh_maf: continue # Remove samples if needed. if self.samples_mask is not None: dosage = dosage[self.samples_mask] # Note that probability is already filtered by gepyto. # Add to the matrix. names.append(name) self._mat.append(dosage) # Add the variant information to the database. db_buffer.append( dict(name=name, chrom=info["chrom"], pos=int(info["pos"]), mac=float(info["minor_allele_count"]), minor=info["minor"], major=info["major"], n_missing=int(n_missing), n_non_missing=int(n_non_missing)) ) # We use sqlalchemy core to insert faster. # When we have more than 100,000 variants, we bulk insert them. if len(db_buffer) >= batch_insert_n: con.execute(Variant.__table__.insert(), db_buffer) num_inserts += len(db_buffer) db_buffer = [] # Bulk insert the remainder. if db_buffer: con.execute(Variant.__table__.insert(), db_buffer) num_inserts += len(db_buffer) del db_buffer logger.info("Built the variant database ({} entries).".format( num_inserts )) # Set the names as the index for the variant dataframe. self._mat = pd.DataFrame(self._mat) self._mat.index = names # Close the connection. con.close() # Freeze the database. self._frozen = True
[docs] def get_genotypes(self, variant_name): """Get a vector of genotypes for a variant. :param variant_name: The variant name (e.g. rs123456) :type variant_name: str :returns: A vector of genotypes (g = 0, 1 or 2; the number of non-reference alleles). :rtype: np.nadarray """ # We want to be able to get genotypes even before experiment # initialization, mainly for testing. To support this, we will look for # the variant in the impute2 file and reset the file. if not hasattr(self, "_mat"): # Check if it was init. logger.warning("This should only be logged during testing. If you " "see this during normal execution, please report " "it on Github.") # Try to find the variant in the impute2file. self._mat, info = self.impute2file.as_matrix() self._mat = pd.DataFrame(self._mat.T) self._mat.index = info["name"] try: vect = self._mat.loc[variant_name, :].values return vect except KeyError: msg = "Variant {} not found in genotype database.".format( variant_name ) raise ValueError(msg)
[docs] def filter_completion(self, rate): """Apply a filter on completion rate. :param rate: The minimum completion rate for inclusion. :type rate: float This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() logger.info("Setting the completion threshold to {}".format(rate)) self.thresh_completion = rate
[docs] def filter_maf(self, maf): """Apply a filter on minor allele frequency. :param rate: The minimum maf for inclusion. :type rate: float This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() logger.info("Setting the MAF threshold to {}".format(maf)) self.thresh_maf = maf
[docs] def filter_name(self, names_list): """Only includes variants in a list. :param names_list: Either a list of variant names or the path to a file containing a single column of variant names. :type names_list: str This is a configuration option. """ # TODO. By modifying gepyto, we could to better filtering on variant # names. We would simply not compute the dosage vector for variants # we want to ignore. if self._frozen: raise FrozenDatabaseError() # A list of IDs. if type(names_list) in (list, tuple): self.names = names_list # A file of IDs. else: logger.info("Keeping only variants with IDs in file: '{}'".format( names_list )) if not os.path.isfile(names_list): names_list = expand(names_list) with open(names_list, "r") as f: self.names = set(f.read().splitlines())
[docs] def exclude_samples(self, samples_list): """Exclude samples in the list. :param samples_list: A list of samples to exclude. :type samples_list: list The returned genotype vectors will not have genotypes for excluded samples (i.e. they will be n elements shorter, n = len(samples_list)) This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() # Build a mask. self.samples_mask = np.ones(len(self.samples), dtype=bool) # Find the index of the samples to mask. for i in samples_list: idx = np.where(self.samples == i)[0] if len(idx) == 0: raise ValueError("Can't remove samples that are not in the " "genotype database ('{}')".format(i)) if idx.shape[0] > 1: raise ValueError("Samples are not unique ('{}').".format(i)) idx = idx[0] self.samples_mask[idx] = False # Also remove from the list of samples. self.samples = self.samples[self.samples_mask]
def close(self): self.impute2file.close()
[docs]class PlinkGenotypeDatabase(AbstractGenotypeDatabase): """Container for binary PLINK files. :param prefix: The prefix of the PLINK bed, bim, fam files. :type prefix: str This container relies on `pyplink <http://github.org/lemieuxl/pyplink>`_. """ def __init__(self, prefix, **kwargs): if not HAS_PYPLINK: raise Exception("Install pyplink to use the '{}' class.".format( self.__class__.__name__, )) self.ped = pyplink.PyPlink(expand(prefix)) self.fam = self.ped.get_fam() self.bim = self.ped.get_bim() # Filters. self.min_maf = 0 self.min_completion = 0 self.good_names = [] self._frozen = False super(PlinkGenotypeDatabase, self).__init__(**kwargs)
[docs] def get_sample_order(self): """Return a list of the (ordered) samples as represented in the database. """ return list(self.fam["iid"].values)
[docs] def get_genotypes(self, variant_name): """Returns a genotype vector for the given variant.""" return self.ped.get_geno_marker(variant_name)
[docs] def experiment_init(self, experiment): """Initialization method called by the Experiment. Applies filtering, creates and fills the database. """ # Create the table. super(PlinkGenotypeDatabase, self).experiment_init(experiment) # Filter and fill the database. con = experiment.engine.connect() db_variants = [] for name, geno in self.ped: info = self.bim.loc[name, :] # Name filtering. if self.good_names: if name not in self.good_names: continue # maf filtering. mac = np.nansum(geno) maf = mac / (2 * geno.shape[0]) if maf < self.min_maf: continue # completion filtering. n_missing = np.sum(np.isnan(geno)) n_non_missing = np.sum(~np.isnan(geno)) completion = n_non_missing / geno.shape[0] if completion < self.min_completion: continue # Everything passed, we can add to the db. db_variants.append( dict(name=name, chrom=info.chrom, pos=int(info.pos), mac=float(mac), n_missing=int(n_missing), n_non_missing=int(n_non_missing), minor=info["a1"], major=info["a2"]) ) con.execute(Variant.__table__.insert(), db_variants) logger.info("Built the variant database ({} entries).".format( len(db_variants) )) self._frozen = True # Filtering methods.
[docs] def filter_name(self, variant_list): """Filter by variant name. This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() if type(variant_list) in (tuple, list): self.good_names = variant_list else: with open(variant_list, "r") as f: self.good_names = f.read().split()
[docs] def filter_maf(self, maf): """Filters variants by allele frequency (MAF). This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() self.min_maf = maf
[docs] def filter_completion(self, rate): """Filters variants by completion rate (rate of no-calls). This is a configuration option. """ if self._frozen: raise FrozenDatabaseError() self.min_rate = rate