"""Chromosight's contact_map submodule contains classes to keep track of the
different aspects of Hi-C data and automate standard pre-processing steps. The
ContactMap class holds the whole genome informations and metadata (chromosome
sizes, resolution, etc) without loading the actual contact map. Upon calling
its "make_sub_matrices" method, it will generate a collection of ContactMap
instances accessible via the sub_mats attribute. Each instance corresponds to
an inter- or intra-chromosomal matrix and the Hi-C matrix of each chromosome is
loaded and preprocessed upon instantiation.
"""
from __future__ import absolute_import
from . import io as cio
import multiprocessing as mp
import cooler
from . import preprocessing as preproc
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import scipy.sparse as sp
[docs]class DumpMatrix:
"""
This class is used as a decorator to wrap ContactMap's methods and generate
dump files of the "matrix" atribute. The matrix should be a scipy.sparse
matrix and will be saved in npy format. The full dump path will be:
inst.dump / os.path.basename(inst.name) + self.dump_name + ".npy"
Where inst is the ContactMap instance of the wrapped method and self is the
DumpMatrix instance. If the inst has no dump attribute, no action is
performed.
Parameters
----------
dump_name : str, os.PathLike object or None
The basename of the file where to save the dump. If None, no action is
performed.
"""
def __init__(self, dump_name):
"""Executed only at method definition when used as method wrapper"""
self.dump_name = dump_name
def __call__(self, fn, *args, **kwargs):
def decorated_fn(*args, **kwargs):
"""
Executed at run time of the wrapped method.
Executes the input function with its arguments, then dumps the
matrix to target path. Note args[0] will always denote the instance
of the wrapped method.
"""
# Execute wrap function and store result
res = fn(*args, **kwargs)
# Instance of the wrapped method
inst = args[0]
# Define dump path based on instance's attributes
if (
hasattr(inst, "matrix")
and inst.dump is not None
and self.dump_name is not None
):
if hasattr(inst, "name"):
dump_path = (
Path(inst.dump) / f"{inst.name}_{self.dump_name}"
)
else:
dump_path = Path(inst.dump) / f"{self.dump_name}"
print(
f"Dumping matrix to {dump_path}"
f" after executing {fn.__name__}"
)
# Save updated matrix to dump path
sp.save_npz(dump_path, inst.matrix)
return res
return decorated_fn
[docs]class HicGenome:
"""
Class used to manage relationships between whole genome and intra- or
inter- chromosomal Hi-C sub matrices. Also handles reading and writing
data.
Attributes:
-----------
clr : cooler.Cooler
Cooler object containing Hi-C data and related informations for
the whole genome
sub_mats : pandas.DataFrame
Table containing intra- and optionally inter-chromosomal matrices.
detectable_bins : array of ints
Array containing indices of detectable rows and columns.
bins : pandas.DataFrame
Table containing bin related informations.
inter : bool
Whether interchromosomal matrices should be stored.
kernel_config : dict
Kernel configuration associated with the Hi-C genome
max_dist : int
Maximum scanning distance for convolution during pattern detection.
dump : str
Base path where dump files will be generated. None means no dump.
smooth : bool
Whether isotonic regression should be used to smooth the signal for
detrending intrachromosomal sub matrices. This will reduce noise at
long ranges but assumes contacts can only decrease with distance from
the diagonal. Do not use with circular chromosomes.
sample : int, float or None
Proportion of contacts to sample from the data if between 0 and 1.
Number of contacts to keep if above 1. Keep all if None.
"""
def __init__(
self,
path,
inter=False,
kernel_config=None,
dump=None,
smooth=False,
sample=None,
):
# Load Hi-C matrix and associated metadata
try:
self.dump = Path(dump)
os.makedirs(self.dump, exist_ok=True)
except TypeError:
self.dump = None
self.clr = cooler.Cooler(path)
self.bins = self.clr.bins()[:]
self.smooth = smooth
self.kernel_config = kernel_config
self.sub_mats = None
self.detectable_bins = np.array(range(self.clr.shape[0]))
self.inter = inter
self.compute_max_dist()
# Whether normalized or raw matrices should be used
self.use_norm = True
if sample is not None:
sample = float(sample)
try:
_ = self.clr.info['sum']
except KeyError:
raise IOError(
"sum info missing from cool file. Please fix the file."
)
try:
if sample > self.clr.info["sum"]:
print(
"sample value is higher than total contacts,"
"skipping subsampling."
)
self.sample = None
elif sample > 1:
self.sample = sample / self.clr.info["sum"]
elif sample > 0:
self.sample = sample
else:
raise ValueError("Sample must be a positive value or None")
except TypeError:
sys.stderr.write('sample must be a positive float or integer')
raise
else:
self.sample = sample
[docs] def compute_max_dist(self):
"""Use the kernel config to compute max_dist"""
# Convert maximum scanning distance to bins using resolution
try:
self.max_dist = max(
self.kernel_config["max_dist"] // self.clr.binsize, 1
)
# Get the size of the largest convolution kernel
self.largest_kernel = max(
[s.shape[0] for s in self.kernel_config["kernels"]]
)
except (ValueError, TypeError):
self.max_dist = None
self.largest_kernel = 3
[docs] def normalize(self, norm='auto', n_mads=5, threads=1):
"""
If the instance's cooler is not balanced, finds detectable bins and
applies ICE normalisation to the whole genome matrix. Newly computed
biases are stored in the input file. If it is already balanced,
detectable bins and weights will be extracted from the file.
Parameters
----------
force_norm : str
Normalization behaviour. If 'auto', existing weights are reused and
matrix is balanced only in the absence of weights. If 'raw', raw
contact values are used. If 'force', weights are recomputed and the
underlying cool file is overwritten.
n_mads : float
Maximum number of median absoluted deviations (MADs) below the
median of the distribution of logged bin sums to consider a bin
detectable.
threads : int
Number of parallel threads to use for normalization.
"""
if norm not in ['auto', 'raw', 'force']:
raise ValueError("norm must be one of: auto, raw, force")
if "weight" in self.bins.columns and norm != 'force':
sys.stderr.write("Matrix already balanced, reusing weights\n")
else:
pool = mp.Pool(threads)
cooler.balance_cooler(
self.clr,
mad_max=n_mads,
cis_only=not self.inter,
store=True,
map=pool.imap_unordered,
ignore_diags=2,
max_iters=200,
min_nnz=10,
chunksize=10000000,
)
pool.close()
print("Whole genome matrix balanced")
# Reload bins attribute to include the weight column
self.bins = self.clr.bins()[:]
if norm == 'raw':
self.use_norm = False
else:
self.use_norm = True
# Bins with NaN weight are missing, matrix already balanced
self.detectable_bins = np.flatnonzero(np.isfinite(self.bins.weight))
print(
f"Found {len(self.detectable_bins)} / {self.clr.shape[0]}"
" detectable bins"
)
[docs] def make_sub_matrices(self):
"""
Generates a table of Hi-C sub matrices. Each sub matrix is either intra
or interchromosomal. The table has 3 columns: chr1, chr2 and
contact_map. The contact_map column contains instances of the
ContactMap class.
Returns
-------
pandas.DataFrame:
The table of sub matrices which will contain n_chrom rows if the
inter attribute is set to false, or (n_chrom^2) / 2 + n_chroms / 2
if inter is True (that is, the upper triangle matrix).
"""
# Create an empty dataframe to store sub matrix info
sub_cols = ["chr1", "chr2", "contact_map"]
n_chroms = len(self.clr.chromnames)
if self.inter:
sub_mats = pd.DataFrame(
np.zeros(
(int(n_chroms ** 2 / 2 + n_chroms / 2), 3), dtype=str
),
columns=sub_cols,
)
else:
sub_mats = pd.DataFrame(
np.zeros((n_chroms, 3), dtype=str), columns=sub_cols
)
d = self.detectable_bins
# Loop over all possible combinations of chromosomes
# in the upper triangle matrix
sys.stderr.write("Preprocessing sub-matrices...\n")
if self.sample is not None:
sys.stderr.write(
f"{np.round(100 * self.sample)}% contacts will be sampled \n"
)
sub_mat_idx = 0
for i1, chr1 in enumerate(self.clr.chromnames):
for i2, chr2 in enumerate(self.clr.chromnames):
s1, e1 = self.clr.extent(chr1)
s2, e2 = self.clr.extent(chr2)
if i1 == i2 or (i1 < i2 and self.inter):
cio.progress(
sub_mat_idx, sub_mats.shape[0], f"{chr1}-{chr2}"
)
# Subset intra/inter sub_mat and matching detectable bins
sub_mat_detectable_bins = (
d[(d >= s1) & (d < e1)] - s1,
d[(d >= s2) & (d < e2)] - s2,
)
map_kwargs = {
'smooth': self.smooth,
'sample': self.sample,
'dump': self.dump,
'use_norm': self.use_norm,
'extent': [(s1, e1), (s2, e2)],
'detectable_bins': sub_mat_detectable_bins,
'name': f"{chr1}-{chr2}",
}
if i1 == i2:
sub_mats.contact_map[sub_mat_idx] = ContactMap(
self.clr,
inter=False,
max_dist=self.max_dist,
largest_kernel=self.largest_kernel,
**map_kwargs,
)
else:
sub_mats.contact_map[sub_mat_idx] = ContactMap(
self.clr,
inter=True,
**map_kwargs,
)
sub_mats.chr1[sub_mat_idx] = chr1
sub_mats.chr2[sub_mat_idx] = chr2
sub_mat_idx += 1
cio.progress(
sub_mat_idx,
sub_mats.shape[0],
(
f"{sub_mats.loc[sub_mat_idx-1, 'chr1']}-"
f"{sub_mats.loc[sub_mat_idx-1, 'chr2']}\n"
)
)
self.sub_mats = sub_mats
print("Sub matrices extracted")
[docs] def gather_sub_matrices(self):
"""Gathers all processed sub_matrices into a whole genome matrix """
gathered = sp.csr_matrix(self.clr.shape)
# Fill empty whole genome matrix with processed submatrices
for i1, r1 in self.sub_mats.iterrows():
s1, e1 = self.clr.extent(r1.chr1)
s2, e2 = self.clr.extent(r1.chr2)
# Use each chrom pair's sub matrix to fill the whole genome matrix
gathered[s1:e1, s2:e2] = r1.contact_map.matrix
gathered = sp.triu(gathered)
return gathered
[docs] def get_full_mat_pattern(self, chr1, chr2, patterns):
"""
Converts bin indices of a list of patterns from an submatrix into their
value in the original full-genome matrix.
Parameters
----------
chr1 : str
Name of the first chromosome of the sub matrix (rows).
chr2 : str
Name of the second chromosome of the sub matrix (cols).
pattern : pandas.DataFrame
A dataframme of pattern coordinates. Each row is a pattern and
columns should be bin1 and bin2, for row and column coordinates in
the Hi-C matrix, respectively.
Returns
-------
full_patterns : pandas.DataFrame
A dataframe similar to the input, but with bins shifted to
represent coordinates in the whole genome matrix.
"""
full_patterns = patterns.copy()
# Get start bin for chromosomes of interest
start1, _ = self.clr.extent(chr1)
start2, _ = self.clr.extent(chr2)
# Shift index by start bin of chromosomes
full_patterns.bin1 += start1
full_patterns.bin2 += start2
return full_patterns
[docs] def get_sub_mat_pattern(self, chr1, chr2, patterns):
"""
Converts bin indices of a list of patterns from the whole genome matrix
into their value in the desired intra- or inter-chromosomal sub-matrix.
Parameters
----------
chr1 : str
Name of the first chromosome of the sub matrix (rows).
chr2 : str
Name of the second chromosome of the sub matrix (cols).
pattern : pandas.DataFrame
A dataframme of pattern coordinates. Each row is a pattern and
columns should be bin1 and bin2, for row and column coordinates in
the Hi-C matrix, respectively.
Returns
-------
full_patterns : pandas.DataFrame
A dataframe similar to the input, but with bins shifted to
represent coordinates in the target sub-matrix.
"""
sub_patterns = patterns.copy()
# Get start bin for chromosomes of interest
start1, _ = self.clr.extent(chr1)
start2, _ = self.clr.extent(chr2)
# Shift index by start bin of chromosomes
sub_patterns.bin1 -= start1
sub_patterns.bin2 -= start2
return sub_patterns
[docs] def bins_to_coords(self, bin_idx):
"""
Converts a list of bin IDs to genomic coordinates based on the whole
genome contact map.
Parameters
----------
bin_idx : numpy.array of ints
A list of bin numbers corresponding to rows or columns of the whole
genome matrix.
Returns
-------
pandas.DataFrame :
A subset of the bins dataframe, with columns chrom, start, end
where chrom is the chromosome name (str), and start and end are the
genomic coordinates of the bin (int).
"""
return self.bins.iloc[bin_idx, :]
[docs] def coords_to_bins(self, coords):
"""
Converts genomic coordinates to a list of bin ids based on the whole
genome contact map.
Parameters
----------
coords : pandas.DataFrame
Table of genomic coordinates, with columns chrom, pos.
Returns
-------
numpy.array of ints :
Indices in the whole genome matrix contact map.
"""
coords.pos = (coords.pos // self.clr.binsize) * self.clr.binsize
# Coordinates are merged with bins, both indices are kept in memory so
# that the indices of matching bins can be returned in the order of the
# input coordinates
idx = (
self.bins.reset_index()
.rename(columns={"index": "bin_idx"})
.merge(
coords.reset_index().rename(columns={"index": "coord_idx"}),
left_on=["chrom", "start"],
right_on=["chrom", "pos"],
how="right",
)
.set_index("bin_idx")
.sort_values("coord_idx")
.index.values
)
return idx