"""Chromosight's detection submodule implements the bulk of chromosight
convolution engine, as well as functions to perform the different steps of the
detection algorithm (pre-processing, local maxima, post-processing...). The
pattern_detector function orchestrate all those different steps."""
from __future__ import absolute_import
import numpy as np
import pandas as pd
import scipy.sparse as sp
import pathlib
import warnings
import chromosight.utils.preprocessing as preproc
import chromosight.utils.stats as cus
warnings.filterwarnings("ignore")
[docs]def validate_patterns(
coords,
matrix,
conv_mat,
detectable_bins,
kernel_matrix,
drop=True,
zero_tol=0.3,
missing_tol=0.75,
):
"""
Given a list of pattern coordinates and a contact map, drop or flag
patterns in low detectability regions or too close to matrix boundaries.
Also returns the surrounding window of Hi-C contacts around each detected
pattern.
Parameters
----------
coords : numpy.array of ints
Coordinates of all detected patterns in the sub matrix. One pattern
per row, the first column is the matrix row, second column is the
matrix col.
matrix : scipy.sparse.csr_matrix
Hi-C contact map of the sub matrix.
conv_mat : scipy.sparse.csr_matrix
Convolution product of the kernel with the Hi-C sub matrix.
detectable_bins : list of numpy.array
List of two 1D numpy arrays of ints representing ids of detectable
rows and columns, respectively.
kernel_matrix : numpy.array of float
The kernel that was used for pattern detection on the Hi-C matrix.
zero_tol : float
Proportion of zero pixels allowed in a pattern window to
consider it valid.
missing_tol : float
Proportion of missing pixels allowed in a pattern window to
consider it valid.
drop : bool
Whether to discard pattern coordinates and windows from patterns which
fall outside the matrix or do not pass validation. If those are kept,
they will be given a score of np.nan and their windows will be filled
with np.nan.
Returns
-------
filtered_coords : pandas.DataFrame
Table of coordinates that passed the filters. The dataframe has 3:
columns: bin1 (rows), bin2 (col) and score (the correlation
coefficient).
filtered_windows : numpy.array
3D numpy array of signal windows around detected patterns. Each window
spans axes 1 and 2, and they are stacked along axis 0.
"""
matrix = matrix.tocsr()
# Pre-compute height, width and half (radius)
win_h, win_w = kernel_matrix.shape
half_h, half_w = win_h // 2 + 1, win_w // 2 + 1
# Store coords to drop
blacklist = []
missing_rows = preproc.valid_to_missing(detectable_bins[0], matrix.shape[0])
missing_cols = preproc.valid_to_missing(detectable_bins[1], matrix.shape[1])
# Copy coords object and append column for scores
validated_coords = pd.DataFrame(
{
"bin1": coords[:, 0],
"bin2": coords[:, 1],
"score": np.full(coords.shape[0], np.nan),
}
)
# validated_coords = np.append(coords, np.zeros((coords.shape[0], 1)), 1)
# Initialize structure to store pattern windows
pattern_windows = np.full(
(coords.shape[0], win_h, win_w), np.nan
) # list containing all panel of detected patterns
for i, l in enumerate(coords):
p1 = int(l[0])
p2 = int(l[1])
high, low = p1 - half_h + 1, p1 + half_h
left, right = p2 - half_w + 1, p2 + half_w
# Check for out of bounds errors
if (
high >= 0
and low < matrix.shape[0]
and left >= 0
and right < matrix.shape[1]
):
# Subset window from chrom matrix
pattern_window = matrix[high:low, left:right].toarray()
n_rows, n_cols = pattern_window.shape
# Compute positions of missing bins relative to window
rel_miss_rows = missing_rows - high
rel_miss_rows = rel_miss_rows[
(rel_miss_rows >= 0) & (rel_miss_rows < n_rows)
]
rel_miss_cols = missing_cols - left
rel_miss_cols = rel_miss_cols[
(rel_miss_cols >= 0) & (rel_miss_cols < n_cols)
]
# Set missing bins to nan
pattern_window[rel_miss_rows, :] = np.nan
pattern_window[:, rel_miss_cols] = np.nan
tot_pixels = n_rows * n_cols
# Number of uncovered or missing pixels
tot_zero_pixels = np.sum(pattern_window == 0)
tot_missing_pixels = np.sum(~np.isfinite(pattern_window))
# The pattern should not contain more missing pixels that the max
# value defined in kernel config. This includes both pixels from
# undetectable bins and zero valued pixels in detectable bins.
prop_undetected = tot_missing_pixels / tot_pixels
prop_zero = tot_zero_pixels / (tot_pixels - tot_missing_pixels)
# Only compute scores for patterns with at least 25% detectable
# pixels (by default) and less than user-defined proportion of zero
# pixels
if (prop_undetected < missing_tol) and (prop_zero < zero_tol):
validated_coords.score[i] = conv_mat[p1, p2]
pattern_windows[i, :, :] = pattern_window
else:
# Current pattern will be dropped due to undetectable bins
blacklist.append(i)
else:
# Current pattern will be dropped due to out of bound error
blacklist.append(i)
# Drop patterns that did not pass filters
if drop:
blacklist = np.array(blacklist)
blacklist_mask = np.zeros(coords.shape[0], dtype=bool)
if len(blacklist):
blacklist_mask[blacklist] = True
filtered_coords = validated_coords.loc[~blacklist_mask, :]
filtered_windows = pattern_windows[~blacklist_mask, :, :]
else:
filtered_coords = validated_coords
filtered_windows = pattern_windows
return filtered_coords, filtered_windows
[docs]def pileup_patterns(pattern_windows):
"""
Generate a pileup (arithmetic mean) from a stack of pattern windows.
Parameters
----------
pattern_windows : numpy.array of floats
3D numpy array of detected windows. Shape is (N, H, W)
where N is the number of windows, H the height, and W
the width of each window.
Returns
-------
numpy.array of floats :
2D numpy array containing the pileup (arithmetic mean) of
input windows.
"""
return np.apply_along_axis(np.nanmean, 0, pattern_windows)
[docs]def pattern_detector(
contact_map,
kernel_config,
kernel_matrix,
coords=None,
dump=None,
full=False,
tsvd=None,
):
"""
Detect patterns in a contact map by kernel matching, and extract windows
around the detected patterns. If coordinates are provided, detection is
skipped and windows are extracted around those coordinates.
Parameters
----------
contact_map : ContactMap object
An object containing an inter- or intra-chromosomal Hi-C contact map
and additional metadata.
kernel_config : dict
The kernel configuration, as documented in
chromosight.utils.io.load_kernel_config
kernel_matrix : numpy.array
The kernel matrix to use for convolution as a 2D numpy array
coords : numpy.array of ints or None
A table with coordinates of patterns, with one pattern per row
and 2 columns being the row and column number of the pattern in
the input contact map. If this is provided, detection is skipped
and quantification is performed on those coordinates.
dump : str or None
Folder in which dumps should be generated after each step of the
detection process. If None, no dump is generated
tsvd : float or None
If a float between 0 and 1 is given, the input kernel is factorised
using truncated SVD, keeping enough singular vectors to retain this
proportion of information. Factorisation speeds up convolution at
the cost of a loss of information. If the number of singular vectors
required to retain the desired information is disabled by default.
Returns
-------
filtered_chrom_patterns : pandas.DataFrame
A table of detected patterns with 4 columns: bin1, bin2, score, qvalue.
chrom_pattern_windows : numpy array
A 3D array containing the pile of windows around detected patterns.
"""
km, kn = kernel_matrix.shape
kh, kw = (km - 1) // 2, (kn - 1) // 2
def save_dump(base, mat):
"""Define where to save the dump"""
sp.save_npz(
pathlib.Path(dump) / f"{contact_map.name}_{base}", mat
)
# Define type of analysis.
run_mode = "detect" if coords is None else "quantify"
# Do not attempt pattern detection unless matrix is larger than the kernel
if min(contact_map.matrix.shape) <= max(kernel_matrix.shape):
return None, None
# If full is specified, missing bins are accounted for using a mask
if full:
missing_mask = preproc.make_missing_mask(
contact_map.matrix.shape,
valid_rows=contact_map.detectable_bins[0],
valid_cols=contact_map.detectable_bins[1],
max_dist=contact_map.max_dist,
sym_upper=not contact_map.inter,
)
else:
missing_mask = None
# Pattern matching operates here
mat_conv, mat_log10_pvals = normxcorr2(
contact_map.matrix.tocsr(),
kernel_matrix,
max_dist=contact_map.max_dist,
sym_upper=not contact_map.inter,
full=full,
missing_mask=missing_mask,
tsvd=tsvd,
pval=True,
missing_tol=kernel_config["max_perc_undetected"] / 100,
)
if dump:
save_dump("03_normxcorr2", mat_conv)
# Clean potential missing values
mat_conv.data[np.isnan(mat_conv.data)] = 0
# Only keep corrcoefs in scannable range
if not contact_map.inter:
mat_conv = preproc.diag_trim(mat_conv.tocsr(), contact_map.max_dist)
if dump:
save_dump("04_diag_trim", mat_conv)
mat_conv = mat_conv.tocoo()
mat_conv.eliminate_zeros()
# Only attempt detection if no input coordinates were given
if run_mode == "detect":
# Find foci of highly correlated pixels and pick local maxima
# coords, foci_mat = pick_foci(np.abs(mat_log10_pvals), 5)
coords, foci_mat = pick_foci(mat_conv, kernel_config["pearson"],)
# If nothing was detected, no point in resuming
if coords is None:
return None, None
if dump:
save_dump("05_foci", foci_mat)
mat = contact_map.matrix.copy()
det = [d.copy() for d in contact_map.detectable_bins]
# Zero pad contact and convolution maps and shift missing bins and detected
# pattern coords before validation if in full mode
if full:
mat = mat.tocoo()
mat = preproc.zero_pad_sparse(mat, kh, kw, fmt="csr")
mat_conv = preproc.zero_pad_sparse(mat_conv, kh, kw, fmt="csr")
det[0] += kh
det[1] += kw
coords[:, 0] += kh
coords[:, 1] += kw
if not contact_map.inter:
# set the first kh / 2 diagonals in the lower triangle to NaN
# so that pileups do not count them
big_k = max(km, kn)
mat = mat.tocsr()
mat += sp.diags(
np.full(big_k, np.nan),
-np.arange(1, big_k + 1),
shape=mat.shape,
format="csr",
)
# When detecting 1D pattern, enforce coordinates on diagonal
# coordinates can be shifted by 1 since we keep the two first
# diagonals to allow formation of foci via 4-way adjacency
if kernel_config["max_dist"] == 0:
coords[:, 0] = coords[:, 1]
# Extract windows around coordinates and assign a correlation
# to each pattern. In detection mode, we drop invalid patterns
# in quantification mode, all input patterns are returned.
filtered_coords, filtered_windows = validate_patterns(
coords,
mat,
mat_conv.tocsr(),
det,
kernel_matrix,
zero_tol=kernel_config["max_perc_zero"] / 100,
missing_tol=kernel_config["max_perc_undetected"] / 100,
drop=True if run_mode == "detect" else False,
)
# Shift coordinates of detected patterns back if padding was added
if full:
filtered_coords.bin1 -= kh
filtered_coords.bin2 -= kw
try:
filtered_coords["pvalue"] = mat_log10_pvals[
filtered_coords.bin1, filtered_coords.bin2
].A1
# No coordinate passed the validation filters
except AttributeError:
filtered_coords["pvalue"] = None
# Remove log10 transform and correct p-values for multiple testing
filtered_coords["pvalue"] = 10 ** filtered_coords["pvalue"]
return filtered_coords, filtered_windows
[docs]def remove_neighbours(patterns, win_size=8):
"""
Identify patterns that are too close from each other to exclude them.
The pattern with the highest score are kept in priority.
Parameters
----------
patterns : numpy.array of float
2D Array of patterns, with 3 columns: bin1, bin2 and score.
win_size : int
The maximum number of pixels at which patterns are considered
overlapping.
Returns
-------
numpy.array of bool :
Boolean array indicating which patterns are valid (True values) and
which are overlapping neighbours (False values)
"""
# Sort patterns by scores
sorted_patterns = patterns.copy().sort_values("score", ascending=False)
# Keep track of patterns to drop
blacklist = set()
for i, p in sorted_patterns.iterrows():
if i not in blacklist:
close_patterns = np.flatnonzero(
(np.abs(sorted_patterns.bin1 - p.bin1) < win_size)
& (np.abs(sorted_patterns.bin2 - p.bin2) < win_size)
)
close_patterns_idx = sorted_patterns.index.values[close_patterns]
close_patterns_idx = close_patterns_idx[close_patterns_idx != i]
for idx in close_patterns_idx:
blacklist.add(idx)
# Get indices of valid patterns as a boolean mask
whitelist_mask = np.ones(sorted_patterns.shape[0], dtype=bool)
whitelist_mask[list(blacklist)] = False
return whitelist_mask
[docs]def pick_foci(mat_conv, pearson, min_size=2):
"""
Pick coordinates of local maxima in a sparse 2D convolution heatmap. A
threshold computed based on the pearson argument is applied to the heatmap.
All values below that threshold are set to 0. The coordinate of the maximum
value in each focus (contiguous region of high values) is returned.
Parameters
----------
mat_conv : scipy.sparse.coo_matrix of floats
A 2D sparse convolution heatmap.
pearson : float
Minimum correlation coefficient required to consider a pixel as
candidate. Increasing this value reduces the amount of false
positive patterns.
min_size : int
Minimum number of pixels required to keep a focus. Pixels belonging to
smaller foci will be set to 0.
Returns
-------
foci_coords : numpy.array of ints
2D array of coordinates for identified patterns corresponding to
foci maxima. None is no pattern was detected.
labelled_mat : scipy.sparse.coo_matrix
The map of detected foci. Pixels which were assigned to a focus are
given an integer as their focus ID. Pixels not assigned to a focus
are set to 0.
"""
# Filter and binarize sparse matrix based on input threshold
candidate_mat = mat_conv.copy()
candidate_mat = candidate_mat.tocoo()
candidate_mat.data[candidate_mat.data < pearson] = 0
candidate_mat.data[candidate_mat.data != 0] = 1
candidate_mat.eliminate_zeros()
# Check if at least one candidate pixel was found
if len(candidate_mat.data) > 0:
# Assign foci identifiers to pixels above threshold
num_foci, labelled_mat = label_foci(candidate_mat)
# Remove foci which are too small
num_foci, labelled_mat = filter_foci(labelled_mat, min_size=min_size)
if num_foci == 0:
return None, None
mat_conv = mat_conv.tocsr()
# Will hold the coordinates of the best pixel for each focus
foci_coords = np.zeros([num_foci, 2], int)
# Iterate over candidate foci
# NOTE: There can be jumps between foci id due to post labelling
# removal of single pixel foci in label_connnected_pixels_sparse.
# This is why we use focus_rank
for focus_rank, focus_id in enumerate(np.unique(labelled_mat.data)):
# Remember 1D indices of datapoint in focus
focus_idx = np.flatnonzero(labelled_mat.data == focus_id)
focus_rows, focus_cols = (
labelled_mat.row[focus_idx],
labelled_mat.col[focus_idx],
)
# Find index of max value within those indices
focus_pixel_idx = np.argmax(mat_conv[focus_rows, focus_cols])
# Retrieve row of original index
original_pixel_idx = focus_idx[focus_pixel_idx]
focus_pixel_row = labelled_mat.row[original_pixel_idx]
focus_pixel_col = labelled_mat.col[original_pixel_idx]
# Save coords of best pixels for this focus in a table
foci_coords[focus_rank, 0] = focus_pixel_row
foci_coords[focus_rank, 1] = focus_pixel_col
else:
return None, None
return foci_coords, labelled_mat
[docs]def label_foci(matrix):
"""
Given a sparse matrix of 1 and 0 values, find
all foci of continuously neighbouring positive pixels
and assign them a label according to their focus. Horizontal
and vertical (4-way) adjacency is considered.
Parameters
----------
matrix : scipy.sparse.coo_matrix of ints
The input matrix where to label foci. Should be filled with 1
and 0s.
Returns
-------
num_foci : int
Number of individual foci identified.
foci_mat : scipy.sparse.coo_matrix:
The matrix with values replaced by their respective foci
labels.
Example
-------
>>> M.todense()
array([[1 0 0 0]
[1 0 1 0]
[1 0 1 1]
[0 0 0 0]])
>>> num_foci, foci_mat = label_foci(M)
>>> num_foci
2
>>>foci_mat.todense()
array([[1 0 0 0]
[1 0 2 0]
[1 0 2 2]
[0 0 0 0]])
"""
# step 1 : ensure that coo format is double ranked (row, col).
matrix_sp = sp.coo_matrix(sp.csr_matrix(matrix))
nb_row, nb_col = matrix_sp.shape
nb_nodes = matrix_sp.nnz
# step 2: prepare structured array 'coo' for later double sorting
dtype = [("row", int), ("col", int)]
coo = np.zeros(matrix_sp.nnz, dtype=dtype)
coo["row"] = matrix_sp.row
coo["col"] = matrix_sp.col
# step 3: find neigbors to the right
coo_rc = np.argsort(coo, order=["row", "col"])
row_rc = coo["row"][coo_rc]
col_rc = coo["col"][coo_rc]
# sanity check
if not (np.all(coo_rc == np.arange(len(coo_rc)))):
raise ValueError("matrix_sp is not properly double sorted")
# Compute row and col indices shift between right-neighbors
diff_row_rc = row_rc[1:] - row_rc[:-1]
diff_col_rc = col_rc[1:] - col_rc[:-1]
# Right connected pixels are on the same row and neighboring cols
right_connected_p = (diff_row_rc == 0) & (diff_col_rc == 1)
right_connected_k = np.flatnonzero(right_connected_p)
right_node1 = right_connected_k
right_node2 = right_connected_k + 1
# step 4: find neigbors beneath
coo_cr = np.argsort(coo, order=["col", "row"])
row_cr = coo["row"][coo_cr]
col_cr = coo["col"][coo_cr]
diff_row_cr = row_cr[1:] - row_cr[:-1]
diff_col_cr = col_cr[1:] - col_cr[:-1]
lower_connected_p = (diff_row_cr == 1) & (diff_col_cr == 0)
lower_connected_k = np.flatnonzero(lower_connected_p)
cr2rc = np.arange(len(coo_cr))[coo_cr]
lower_node1 = cr2rc[lower_connected_k]
lower_node2 = cr2rc[lower_connected_k + 1]
# step 5: build adjacency matrix
node1 = np.concatenate([right_node1, lower_node1])
node2 = np.concatenate([right_node2, lower_node2])
data = np.ones(len(node1), int)
adj_mat = sp.coo_matrix((data, (node1, node2)), shape=(nb_nodes, nb_nodes))
# step 6: find connected components
num_foci, foci = sp.csgraph.connected_components(adj_mat, directed=False)
foci_mat = sp.coo_matrix(
(foci + 1, (matrix_sp.row, matrix_sp.col)), shape=(nb_row, nb_col)
)
return num_foci, foci_mat
[docs]def filter_foci(foci_mat, min_size=2):
"""
Given an input matrix of labelled foci (continuous islands of equal nonzero
values), filters out foci consisting of fewer pixels than min_size.
Parameters
----------
foci_mat : scipy.sparse.coo_matrix
Input matrix of labelled foci. Pixels are numbered according to their
respective foci. Pixels that are not assigned to a focus are 0.
min_size : int
Minimum number of pixels required to keep a focus. Pixels belonging to
smaller foci will be set to 0.
Returns
-------
num_filtered : int
Number of foci remaining after filtering.
filtered_mat : scipy.sparse.coo_matrix
Matrix of filtered foci.
"""
foci_data = foci_mat.data
focus_id, focus_size = np.unique(foci_data, return_counts=True)
# Remove foci smaller than min_size
small_foci = focus_id[focus_size < min_size]
# mask small foci for removal
for focus_id in small_foci:
foci_data[foci_data == focus_id] = 0
# Copy input matrix and replace data with filtered foci
filtered_mat = foci_mat.copy()
filtered_mat.data = foci_data
# Force updating to make explicit zeros implicit
filtered_mat.eliminate_zeros()
num_filtered = len(focus_size[focus_size >= min_size])
return num_filtered, filtered_mat
[docs]def xcorr2(signal, kernel, threshold=1e-4, tsvd=None):
"""
Cross correlate a dense or sparse 2D signal with a dense 2D kernel.
Parameters
----------
signal: scipy.sparse.csr_matrix or numpy.array of floats
A 2-dimensional numpy array Ms x Ns acting as the detrended Hi-C map.
kernel: numpy.array of floats
A 2-dimensional numpy array Mk x Nk acting as the pattern template.
threshold : float
Convolution score below which pixels will be set back to zero to save
on time and memory.
tsvd : float or None
If a float between 0 and 1 is given, the input kernel is factorised
using truncated SVD, keeping enough singular vectors to retain this
proportion of information. Factorisation speeds up convolution at
the cost of a loss of information. If the number of singular vectors
required to retain the desired information isDisabled by default.
-------
out: scipy.sparse.csr_matrix or numpy.array
Convolution product of signal by kernel. Same type as the input signal.
"""
if tsvd is not None:
kernel = preproc.factorise_kernel(kernel, prop_info=tsvd)
if sp.issparse(signal):
conv = _xcorr2_sparse(signal.tocsr(), kernel, threshold=threshold)
else:
conv = _xcorr2_dense(signal, kernel, threshold=threshold)
return conv
def _xcorr2_sparse(signal, kernel, threshold=1e-6):
"""
Cross correlate a sparse 2D signal with a dense 2D kernel.
Parameters
----------
signal: scipy.sparse.csr_matrix
A 2-dimensional numpy array Ms x Ns acting as the detrended Hi-C map.
kernel: numpy.array of floats or tuple of numpy.arrays
A 2-dimensional numpy array Mk x Nk acting as the pattern template. Can
also be a factorised kernel.
threshold : float
Convolution score below which pixels will be set back to zero to save
on time and memory.
Returns
-------
out: scipy.sparse.csr_matrix
Convolution product of signal by kernel.
"""
sm, sn = signal.shape
if type(kernel) is tuple:
kernel_l, kernel_r = kernel
km = kernel_l.shape[0]
kn = kernel_r.shape[1]
if kernel_l.shape[1] != kernel_r.shape[0]:
raise ValueError("Kernel factorisation is invalid")
n_factors = kernel_l.shape[1]
for f in range(n_factors):
subkernel_l = sp.diags(
kernel_l[:, f], np.arange(km), shape=(sm - km + 1, sm), format="dia",
)
subkernel_r = sp.diags(
kernel_r[f, :], -np.arange(kn), shape=(sn, sn - kn + 1), format="dia",
)
if f == 0:
out = (subkernel_l @ signal) @ subkernel_r
else:
out += (subkernel_l @ signal) @ subkernel_r
else:
km, kn = kernel.shape
# Sanity checks
if sp.issparse(kernel):
raise ValueError("cannot handle kernel in sparse format")
if not sp.issparse(signal):
raise ValueError("cannot handle signal in dense format")
# Check of kernel is constant (uniform)
constant_kernel = np.nan
if np.allclose(kernel, np.tile(kernel[0, 0], kernel.shape), rtol=1e-08):
constant_kernel = kernel[0, 0]
out = sp.csc_matrix((sm - km + 1, sn - kn + 1), dtype=np.float64)
# Simplified convolution for the special case where kernel is constant:
if np.isfinite(constant_kernel):
l_subkernel_sp = sp.diags(
constant_kernel * np.ones(km),
np.arange(km),
shape=(sm - km + 1, sm),
format="dia",
)
r_subkernel_sp = sp.diags(
np.ones(kn), -np.arange(kn), shape=(sn, sn - kn + 1), format="dia",
)
out = (l_subkernel_sp @ signal) @ r_subkernel_sp
# convolution code for general case
# 1. 2D kernel composed of 1D filters, each col being a 1D filter.
# 2. input remains unchanged, and each 1D kernel is unrolled into
# a sparse toeplitz matrix.
# 3. Each 1D conv is computed via a sparse matrix x vector product.
# It is fast because the product is delegated to numpy.
else:
# In case the kernel is rectangle, it is faster to scan
# the largest dimension first
if kn < km:
for kj in range(kn):
subkernel_sp = sp.diags(
kernel[:, kj], np.arange(km), shape=(sm - km + 1, sm), format="csr",
)
out += subkernel_sp.dot(signal[:, kj : sn - kn + 1 + kj])
else:
for ki in range(km):
subkernel_sp = sp.diags(
np.array(kernel[ki, :]).flatten(), np.arange(kn), shape=(sn - kn + 1, sn), format="csr",
)
out += signal[ki : sm - km + 1 + ki, :].dot(subkernel_sp.T)
# Set very low pixels to 0
out.data[np.abs(out.data) < threshold] = 0
out.eliminate_zeros()
# Resize matrix to original dimensions
out = preproc.zero_pad_sparse(
out, margin_h=(kn - 1) // 2, margin_v=(km - 1) // 2, fmt="csr"
)
return out
def _xcorr2_dense(signal, kernel, threshold=1e-6):
"""Cross correlate a dense 2D signal with a dense 2D kernel.
Parameters
----------
signal: numpy.array
A 2-dimensional numpy array Ms x Ns acting as the detrended Hi-C map.
kernel: array_like
A 2-dimensional numpy array Mk x Nk acting as the pattern template.
threshold : float
Convolution score below which pixels will be set back to zero to save
on time and memory.
Returns
-------
out: numpy.array
Convolution product of signal by kernel.
"""
sm, sn = signal.shape
if type(kernel) is tuple:
kernel_l, kernel_r = kernel
km = kernel_l.shape[0]
kn = kernel_r.shape[1]
if kernel_l.shape[1] != kernel_r.shape[0]:
raise ValueError("Kernel factorisation is invalid")
n_factors = kernel_l.shape[1]
for f in range(n_factors):
subkernel_l = sp.diags(
kernel_l[:, f], np.arange(km), shape=(sm - km + 1, sm), format="dia",
)
subkernel_r = sp.diags(
kernel_r[f, :], -np.arange(kn), shape=(sn, sn - kn + 1), format="dia",
)
if f == 0:
out = (subkernel_l @ signal) @ subkernel_r
else:
out += (subkernel_l @ signal) @ subkernel_r
else:
km, kn = kernel.shape
# Kernel (half) height and width
constant_kernel = np.nan
if np.allclose(kernel, np.tile(kernel[0, 0], kernel.shape), rtol=1e-08):
constant_kernel = kernel[0, 0]
out_wo_margin = np.zeros([sm - km + 1, sn - kn + 1])
# Simplified convolution for the special case where kernel is constant:
if np.isfinite(constant_kernel):
l_subkernel_sp = sp.diags(
constant_kernel * np.ones(km),
np.arange(km),
shape=(sm - km + 1, sm),
format="dia",
)
r_subkernel_sp = sp.diags(
np.ones(kn), -np.arange(kn), shape=(sn, sn - kn + 1), format="dia",
)
out_wo_margin = (l_subkernel_sp @ signal) @ r_subkernel_sp
# convolution code for general case
# 1. 2D kernel composed of 1D filters, each col being a 1D filter.
# 2. input remains unchanged, and each 1D kernel is unrolled into
# a sparse toeplitz matrix.
# 3. Each 1D conv is computed via a sparse matrix x vector product.
# It is fast because the product is delegated to numpy.
else:
for kj in range(kn):
# Make a toeplitz matrix from the kj'th column of the kernel
subkernel_sp = sp.diags(
kernel[:, kj], np.arange(km), shape=(sm - km + 1, sm), format="csr",
)
# Note: We compute the dot product of the subkernel with the signal
# shifted by kj.
out_wo_margin += subkernel_sp @ signal[:, kj : sn - kn + 1 + kj]
kh = (km - 1) // 2
kw = (kn - 1) // 2
# Add margins of zeros where kernel overlaps edges
out = np.zeros([sm, sn])
out[kh:-kh, kw:-kw] = out_wo_margin
# Set very low pixels to 0
out[np.abs(out) < threshold] = 0
return out
[docs]def normxcorr2(
signal,
kernel,
max_dist=None,
sym_upper=False,
full=False,
missing_mask=None,
missing_tol=0.75,
tsvd=None,
pval=False,
):
"""
Computes the normalized cross-correlation of a dense or sparse signal and a
dense kernel. The resulting matrix contains Pearson correlation
coefficients.
Parameters
----------
signal : scipy.sparse.csr_matrix or numpy.array
The input processed Hi-C matrix.
kernel : numpy.array
The pattern kernel to use for convolution.
max_dist : int
Maximum scan distance, in number of bins from the diagonal. If None,
the whole matrix is convoluted. Otherwise, pixels further than this
distance from the diagonal are set to 0 and ignored for performance.
Only useful for intrachromosomal matrices.
sym_upper : False
Whether the matrix is symmetric and upper triangle. True for
intrachromosomal
matrices.
missing_mask : scipy.sparse.coo_matrix of ints
Matrix defining which pixels are missing (1) or not (0).
full : bool
If True, convolutions will be made in 'full' mode; the matrix is first
padded with margins to allow scanning to the edges, and missing bins
are also masked to exclude them when computing correlation scores.
Computationally intensive
missing_mask : scipy.sparse.csr_matrix of bool or None
Mask matrix denoting missing bins, where missing is denoted as True and
valid as False. Can be None to ignore missing bin information. Only
taken into account when full=True.
missing_tol : float
Proportion of missing values allowed in windows to keep the correlation
coefficients.
tsvd : float or None
If a float between 0 and 1 is given, the input kernel is factorised
using truncated SVD, keeping enough singular vectors to retain this
proportion of information. Factorisation speeds up convolution at
the cost of a loss of information. If the number of singular vectors
required to retain the desired information isDisabled by default.
pval : bool
Whether to return a matrix of p-values.
Returns
-------
scipy.sparse.csr_matrix or numpy.array
The sparse matrix of correlation coefficients. Same type as the input
signal.
scipy.sparse.csr_matrix or numpy.array or None
A map of Benjamini-Hochberg corrected p-values (q-values). Same type as
the input signal. If pval=False, this will be None.
"""
if missing_mask is not None:
if not sp.issparse(missing_mask):
raise ValueError("Missing mask must be a sparse matrix.")
if not signal.shape == missing_mask.shape:
raise ValueError("Signal and missing mask do not have the same shape")
if missing_mask.dtype != bool:
raise ValueError(
f"Missing mask dtype is {missing_mask.dtype}. Should be bool."
)
if min(kernel.shape) >= max(signal.shape):
raise ValueError("cannot have kernel bigger than signal")
preproc.check_missing_mask(signal, missing_mask)
if sp.issparse(kernel):
raise ValueError("cannot handle kernel in sparse format")
if not (kernel.std() > 0):
raise ValueError("Cannot have flat kernel.")
if sp.issparse(signal):
corr, pvals = _normxcorr2_sparse(
signal,
kernel,
max_dist=max_dist,
sym_upper=sym_upper,
full=full,
missing_mask=missing_mask,
missing_tol=missing_tol,
tsvd=tsvd,
pval=pval,
)
else:
corr, pvals = _normxcorr2_dense(
signal,
kernel,
max_dist=max_dist,
sym_upper=sym_upper,
full=full,
missing_tol=missing_tol,
missing_mask=missing_mask,
tsvd=tsvd,
pval=pval,
)
return corr, pvals
def _normxcorr2_sparse(
signal,
kernel,
max_dist=None,
sym_upper=False,
full=False,
missing_mask=None,
missing_tol=0.75,
tsvd=None,
pval=False,
):
"""Computes the normalized cross-correlation of a sparse signal and a
dense kernel. The resulting sparse matrix contains Pearson correlation
coefficients.
Parameters
----------
signal : scipy.sparse.csr_matrix
The input processed Hi-C matrix.
kernel : numpy.array
The pattern kernel to use for convolution. Can be a factorised kernel
stored in a tuple.
max_dist : int
Maximum scan distance, in number of bins from the diagonal. If None,
the whole matrix is convoluted. Otherwise, pixels further than this
distance from the diagonal are set to 0 and ignored for performance.
Only useful for intrachromosomal matrices.
sym_upper : False
Whether the matrix is symmetric and upper triangle. True for
intrachromosomal matrices.
missing_mask : scipy.sparse.csr_matrix of bools
Matrix defining which pixels are missing (True) or not (False).
missing_tol : float
Proportion of missing values allowed in windows to keep the correlation
coefficients.
full : bool
Whether to run in 'full' mode, which means enclosing the signal in an
exterior frame and computing the correlation up to the edges.
tsvd : float or None
If a float between 0 and 1 is given, the input kernel is factorised
using truncated SVD, keeping enough singular vectors to retain this
proportion of information. Factorisation speeds up convolution at
the cost of a loss of information. If the number of singular vectors
required to retain the desired information isDisabled by default.
pval : bool
Whether to return a matrix of p-values.
Returns
-------
scipy.sparse.csr_matrix
The sparse matrix of correlation coefficients
"""
mk, nk = kernel.shape
ms, ns = signal.shape
# Generate constant kernel
kernel1 = np.ones(kernel.shape)
kernel_size = mk * nk
# In full mode, we compute the convolution with all pixels in the input
# signal. We need to add a margin around the input to allow the kernel to
# be centered on the edges.
if full:
# Create a vertical margin and use it to pad the signal
tmp = sp.csr_matrix((mk - 1, ns))
framed_sig = sp.vstack([tmp, signal, tmp], format=signal.format)
# Same for the horizontal margin
tmp = sp.csr_matrix((ms + 2 * (mk - 1), nk - 1))
framed_sig = sp.hstack([tmp, framed_sig, tmp], format=signal.format)
# If a missing mask was specified, use it
if missing_mask is not None:
# Add margins around missing mask.
framed_missing_mask = preproc.frame_missing_mask(
missing_mask, kernel.shape, sym_upper=sym_upper, max_dist=max_dist,
)
else:
framed_missing_mask = None
else:
framed_sig = signal.copy()
if missing_mask is None:
framed_missing_mask = None
else:
framed_missing_mask = missing_mask.copy()
# Ignore missing values
if framed_missing_mask is None:
kernel_mean = float(kernel.mean())
kernel_std = float(kernel.std())
# out contains framed_sig mean
out = xcorr2(framed_sig, kernel1 / kernel_size)
denom = xcorr2(framed_sig.power(2), kernel1 / kernel_size) - out.power(2)
denom = denom.sqrt() * kernel_std
# quick and dirty hack to robustify: numerical-zeros are turned into
# real-zeros
denom.data[np.abs(denom.data) < 1e-10] = 0.0
denom.data = 1.0 / denom.data
# store numerator directly in 'out' to avoid multiple replication of
# data
out = xcorr2(framed_sig, kernel / kernel_size, tsvd=tsvd) - out * kernel_mean
# Multiply by invert since sparse division is not allowed
out = out.multiply(denom)
else:
# Safety check to make sure mask matches signal
preproc.check_missing_mask(framed_sig, framed_missing_mask)
kernel_sum = np.sum(kernel)
kernel_mean = kernel_sum / kernel_size
kernel2_sum = np.sum(kernel ** 2)
kernel2_mean = kernel2_sum / kernel_size
# Compute convolution of uniform kernel with missing mask to get number
# of missing pixels in each window. Will be plugged into Pearson at
# the end.
ker1_coo = xcorr2(framed_missing_mask, kernel1).tocoo()
# From now on, ker1_coo.data contains the number of 'present' samples.
# (where there is at least one missing pixel)
ker1_coo.data = kernel_size - ker1_coo.data
# Compute mean corrected with with number of missing elements (wm)
kernel_mean_wm = (
kernel_sum
- xcorr2(framed_missing_mask, kernel, tsvd=tsvd)[
ker1_coo.row, ker1_coo.col
].A1
) / ker1_coo.data
kernel2_mean_wm = (
kernel2_sum
- xcorr2(framed_missing_mask, kernel ** 2, tsvd=tsvd)[
ker1_coo.row, ker1_coo.col
].A1
) / ker1_coo.data
# store signal mean directly in 'out' to avoid multiple replication of
# data
out = xcorr2(framed_sig, kernel1 / kernel_size)
out[ker1_coo.row, ker1_coo.col] = (
out[ker1_coo.row, ker1_coo.col].A1 * kernel_size / ker1_coo.data
)
denom = xcorr2(framed_sig.power(2), kernel1 / kernel_size)
denom[ker1_coo.row, ker1_coo.col] = (
denom[ker1_coo.row, ker1_coo.col].A1 * kernel_size / ker1_coo.data
)
denom = (denom - out.power(2)) * (kernel2_mean - kernel_mean ** 2)
denom[ker1_coo.row, ker1_coo.col] = (
denom[ker1_coo.row, ker1_coo.col].A1
/ (kernel2_mean - kernel_mean ** 2)
* (kernel2_mean_wm - kernel_mean_wm ** 2)
)
denom = denom.sqrt()
# ensure that enough data points are inside of window
denom[
ker1_coo.row[ker1_coo.data < int((1 - missing_tol) * kernel_size)],
ker1_coo.col[ker1_coo.data < int((1 - missing_tol) * kernel_size)],
] = 0.0
# store numerator directly in 'out' to avoid multiple copies of data
out *= kernel_mean
out[ker1_coo.row, ker1_coo.col] = (
out[ker1_coo.row, ker1_coo.col].A1
* kernel_mean_wm
* ker1_coo.data
/ (kernel_mean * kernel_size)
)
out = xcorr2(framed_sig, kernel / kernel_size, tsvd=tsvd) - out
out[ker1_coo.row, ker1_coo.col] = (
out[ker1_coo.row, ker1_coo.col].A1 * kernel_size / ker1_coo.data
)
# Remember where very low values are located in denom to avoid nans
denom_0 = abs(denom.data) < 1e-10
# take inverse, because 2 sparse mats can't be divided
denom.data[~denom_0] = 1 / denom.data[~denom_0]
denom.data[denom_0] = 0.0
out = out.multiply(denom)
# if (max_dist is not None) and sym_upper:
# # Trim diagonals further than max_scan_distance
# out = preproc.diag_trim(out.tocsr(), max_dist)
if sym_upper:
out = sp.triu(out)
out = out.tocoo()
out.data[~np.isfinite(out.data)] = 0.0
# Remove near zero coeffs to spare memory
out.data[np.abs(out.data) < 0] = 0.0
# Prevent rounding errors to generate values above 1 or below -1
out.data[out.data < -1] = -1.0
out.data[out.data > 1] = 1.0
out.eliminate_zeros()
if pval:
pvals = out.copy()
if full and framed_missing_mask is not None:
try:
# Get number of values for each coeff
n_obs = ker1_coo.tocsr()[pvals.row, pvals.col].A1
# Replace implicit n_obs by total kernel size
n_obs[n_obs == 0] = kernel_size
pvals.data = cus.corr_to_pval(out.data, n_obs)
# No nonzero coeff in the matrix, skip calculation of p-values
except AttributeError:
pass
else:
pvals.data = cus.corr_to_pval(out.data, kernel_size)
pvals = pvals.tocsr()
if full:
pvals = pvals[mk - 1 : -mk + 1, nk - 1 : -nk + 1]
else:
pvals = None
out = out.tocsr()
if full:
out = out[mk - 1 : -mk + 1, nk - 1 : -nk + 1]
return out, pvals
def _normxcorr2_dense(
signal,
kernel,
max_dist=None,
sym_upper=False,
full=None,
missing_mask=None,
tsvd=None,
missing_tol=0.75,
pval=False,
):
"""Computes the normalized cross-correlation of a dense or sparse signal
and a dense kernel. The resulting matrix contains Pearson correlation
coefficients.
Parameters
----------
signal : numpy.array
The input processed Hi-C matrix.
kernel : numpy.array or tuple of numpy.arrays
The pattern kernel to use for convolution. Can be a factorised kernel
stored in a tuple.
max_dist : int
Maximum scan distance, in number of bins from the diagonal. If None,
the whole matrix is convoluted. Otherwise, pixels further than this
distance from the diagonal are set to 0 and ignored for performance.
Only useful for intrachromosomal matrices.
sym_upper : False
Whether the matrix is symmetric and upper triangle. True for
intrachromosomal matrices.
full : bool
missing_mask : numpy.array of bools
Nump
missing_tol : float
Proportion of missing values allowed in windows to keep the correlation
coefficients.
tsvd : float or None
If a float between 0 and 1 is given, the input kernel is factorised
using truncated SVD, keeping enough singular vectors to retain this
proportion of information. Factorisation speeds up convolution at
the cost of a loss of information. If the number of singular vectors
required to retain the desired information isDisabled by default.
pval : bool
Whether to return a matrix of p-values.
Returns
-------
numpy.array
The sparse matrix of correlation coefficients
"""
mk, nk = kernel.shape
ms, ns = signal.shape
# Convert numpy matrices to array to avoid operator overloading
if isinstance(signal, np.matrix):
signal = np.array(signal)
if isinstance(kernel, np.matrix):
kernel = np.array(kernel)
kernel_size = mk * nk
kernel1 = np.ones(kernel.shape)
if full:
framed_sig = np.zeros([ms + 2 * (mk - 1), ns + 2 * (nk - 1)])
framed_sig[mk - 1 : -mk + 1, nk - 1 : -nk + 1] = signal
if missing_mask is not None:
# Add margins around missing mask.
framed_missing_mask = preproc.frame_missing_mask(
missing_mask, kernel.shape, sym_upper=sym_upper, max_dist=max_dist,
)
else:
framed_sig = signal
if missing_mask is not None:
framed_missing_mask = missing_mask.copy()
# Pearson correlation
kernel_mean = float(kernel.mean())
kernel_std = float(kernel.std())
framed_sig_mean = xcorr2(framed_sig, kernel1 / kernel_size)
if missing_mask is None:
out = xcorr2(framed_sig, kernel / kernel_size) - framed_sig_mean * kernel_mean
denom = kernel_std * np.sqrt(
xcorr2(framed_sig ** 2, kernel1 / kernel_size) - framed_sig_mean ** 2
)
denom_0 = abs(denom) < 1e-10
out[~denom_0] /= denom[~denom_0]
out[denom_0] = 0.0
else:
kernel_size = mk * nk - xcorr2(framed_missing_mask, kernel1).toarray()
kernel_mean = (
np.sum(kernel) - xcorr2(framed_missing_mask, kernel).toarray()
) / kernel_size
# store signal0_mean in 'out' to avoid multiple replication of data
out = xcorr2(framed_sig, kernel1) / kernel_size
denom = xcorr2(framed_sig ** 2, kernel1) / kernel_size
# Use multiplicative coeff to correct denom by number of missing values
denom = np.sqrt(
(denom - out ** 2)
* (
(
np.sum(kernel ** 2)
- xcorr2(framed_missing_mask, kernel ** 2).toarray()
)
/ kernel_size
- kernel_mean ** 2
)
)
# ensure that enough data points are inside of window
denom[kernel_size < int((1 - missing_tol) * mk * nk)] = 0.0
# store numerator directly in 'out' to avoid multiple copies of data
out = xcorr2(framed_sig, kernel / kernel_size) - out * kernel_mean
# Remember where very low values are located in denom to avoid nans
denom_0 = abs(denom) < 1e-10
out[~denom_0] /= denom[~denom_0]
out[denom_0] = 0.0
if sym_upper:
out = np.triu(out)
out[~np.isfinite(out)] = 0.0
out[np.abs(out) < 0] = 0.0
# Prevent rounding errors to generate values above 1 or below -1
out[out < -1] = -1.0
out[out > 1] = 1.0
if pval:
if full:
# Get number of values for each coeff
n_obs = kernel_size.flatten()
else:
n_obs = kernel_size
pvals = cus.corr_to_pval(out.flatten(), n_obs).reshape(out.shape)
if full:
pvals = pvals[mk - 1 : -mk + 1, nk - 1 : -nk + 1]
else:
pvals = None
if full:
out = out[mk - 1 : -mk + 1, nk - 1 : -nk + 1]
return out, pvals