#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Pattern exploration and detection
Explore and detect patterns (loops, borders, centromeres, etc.) in Hi-C contact
maps with pattern matching.
Usage:
chromosight detect [--kernel-config=FILE] [--pattern=loops]
[--pearson=auto] [--win-size=auto] [--iterations=auto]
[--win-fmt={json,npy}] [--norm={auto,raw,force}]
[--subsample=no] [--inter] [--tsvd] [--smooth-trend]
[--n-mads=5] [--min-dist=0] [--max-dist=auto]
[--no-plotting] [--min-separation=auto] [--dump=DIR]
[--threads=1] [--perc-zero=auto]
[--perc-undetected=auto] <contact_map> <prefix>
chromosight generate-config [--preset loops] [--click contact_map]
[--norm={auto,raw,norm}] [--win-size=auto] [--n-mads=5]
[--chroms=CHROMS] [--inter] [--threads=1] <prefix>
chromosight quantify [--inter] [--pattern=loops] [--subsample=no]
[--win-fmt=json] [--kernel-config=FILE] [--norm={auto,raw,norm}]
[--threads=1] [--n-mads=5] [--win-size=auto]
[--perc-undetected=auto] [--perc-zero=auto]
[--no-plotting] [--tsvd] <bed2d> <contact_map> <prefix>
chromosight list-kernels [--long] [--mat] [--name=kernel_name]
chromosight test
detect:
performs pattern detection on a Hi-C contact map via template matching
generate-config:
Generate pre-filled config files to use for detect and quantify.
A config consists of a JSON file describing parameters for the
analysis and path pointing to kernel matrices files. Those matrices
files are tsv files with numeric values as kernel to use for
convolution.
quantify:
Given a list of pairs of positions and a contact map, computes the
correlation coefficients between those positions and the kernel of the
selected pattern.
list-kernels:
Prints information about available kernels.
test:
Download example data and run loop detection on it.
Arguments for detect:
contact_map The Hi-C contact map to detect patterns on, in
cool format.
prefix Common path prefix used to generate output files.
Extensions will be added for each file.
Arguments for quantify:
bed2d Tab-separated text files with columns chrom1, start1
end1, chrom2, start2, end2. Each line correspond to
a pair of positions (i.e. a position in the matrix).
contact_map Path to the contact map, in cool format.
prefix Common path prefix used to generate output files.
Extensions will be added for each file.
Arguments for generate-config:
prefix Path prefix for config files. If prefix is a/b,
files a/b.json and a/b.1.txt will be generated.
If a given pattern has N kernel matrices, N txt
files are created they will be named a/b.[1-N].txt.
-e, --preset=loops Generate a preset config for the given pattern.
Preset configs available are "loops" and
"borders". [default: loops]
-c, --click=contact_map Show input contact map and uses double clicks from
user to build the kernel. Warning: memory-heavy,
reserve for small genomes or subsetted matrices.
-C, --chroms=CHROMS Comma-separated list of chromosome names. When used
with --click, this will show each chromosome's
one-by-one sequentially instead of the whole genome.
This is useful to reduce memory usage.
Arguments for list-kernels:
--name=kernel_name Only show information related to a particular
kernel.[default: all]
--long Show default parameters in addition to kernel names.
--mat Prints an ascii representation of the kernel matrix.
Basic options:
-h, --help Display this help message.
--version Display the program's current version.
--verbose Displays the logo.
-n, --norm={auto,raw,force} Normalization / balancing behaviour. auto: weights
present in the cool file are used. raw: raw contact
values are used. force: recompute weights and
overwrite existing values. raw[default: auto]
-I, --inter Enable to consider interchromosomal contacts.
Warning: Experimental feature with high memory
consumption, only use with small matrices.
-m, --min-dist=auto Minimum distance from the diagonal (in base pairs).
at which detection should operate. [default: auto]
-M, --max-dist=auto Maximum distance from the diagonal (in base pairs)
for detection. [default: auto]
-P, --pattern=loops Which pattern to detect. This will use preset
configurations for the given pattern. Possible
values are: loops, loops_small, borders, hairpins and
centromeres. [default: loops]
-p, --pearson=auto Pearson correlation threshold when detecting patterns
in the contact map. Lower values leads to potentially
more detections, but more false positives. [default: auto]
-s, --subsample=INT If greater than 1, subsample INT contacts from the
matrix. If between 0 and 1, subsample a proportion of
contacts instead. Useful when comparing matrices with
different coverages. [default: no]
-t, --threads=1 Number of CPUs to use in parallel. [default: 1]
-u, --perc-undetected=auto Maximum percentage of non-detectable pixels (nan) in
windows allowed to report patterns. [default: auto]
-z, --perc-zero=auto Maximum percentage of empty (0) pixels in windows
allowed to report patterns. [default: auto]
Advanced options:
-d, --dump=DIR Directory where to save matrix dumps during
processing and detection. Each dump is saved as
a compressed npz of a sparse matrix and can be
loaded using scipy.sparse.load_npz.
-i, --iterations=auto How many iterations to perform after the first
template-based pass. [default: 1]
-k, --kernel-config=FILE Optionally give a path to a custom JSON kernel
config path. Use this to override pattern if
you do not want to use one of the preset
patterns.
--no-plotting Disable generation of pileup plots.
-N, --n-mads=5 Maximum number of median absolute deviations below
the median of the bin sums distribution allowed to
consider detectable bins. [default: 5]
-S, --min-separation=auto Minimum distance required between patterns, in
basepairs. If two patterns are closer than this
distance in both axes, the one with the lowest
score is discarded. [default: auto]
-T, --smooth-trend Use isotonic regression when detrending to reduce
noise at long ranges. Do not enable this for circular
genomes.
-V, --tsvd Enable kernel factorisation via truncated svd.
Accelerates detection, at the cost of slight
inaccuracies. Singular matrices are truncated to
retain 99.9% of the information in the kernel.
-w, --win-fmt={json,npy} File format used to store individual windows
around each pattern. Window order matches
patterns inside the associated text file.
Possible formats are json and npy. [default: json]
-W, --win-size=auto Window size (width), in pixels, to use for the
kernel when computing correlations. The pattern
kernel will be resized to match this size. Linear
linear interpolation is used to fill between pixels.
If not specified, the default kernel size will
be used instead. [default: auto]
"""
import sys
import os
import io
import itertools as it
from contextlib import contextmanager
import json
import tempfile
import pathlib
import numpy as np
import pandas as pd
import docopt
import multiprocessing as mp
from chromosight.version import __version__
from chromosight.utils.contacts_map import HicGenome
import chromosight.utils.io as cio
import chromosight.utils.detection as cid
from chromosight.utils.plotting import (
pileup_plot,
click_finder,
print_ascii_mat,
)
from chromosight.utils.preprocessing import resize_kernel
from chromosight.utils.stats import fdr_correction
import chromosight.kernels as ck
import scipy.ndimage as ndi
import matplotlib.pyplot as plt
LOGO = np.loadtxt(pathlib.Path(__file__).parents[0] / "logo.txt")
URL_EXAMPLE_DATASET = (
"https://raw.githubusercontent.com/koszullab/"
"chromosight/master/data_test/example.cool"
)
TEST_LOG = f"""Fetching test dataset at {URL_EXAMPLE_DATASET}...
Running detection on test dataset...
pearson set to 0.3 based on config file.
max_dist set to 2000000 based on config file.
min_dist set to 20000 based on config file.
min_separation set to 5000 based on config file.
max_perc_undetected set to 50.0 based on config file.
max_perc_zero set to 10.0 based on config file.
Matrix already balanced, reusing weights
Preprocessing sub-matrices...
Detecting patterns...
89 patterns detected
Saving patterns in chromosight_test.tsv
Saving patterns in chromosight_test.json
"""
def _override_kernel_config(param_name, param_value, param_type, config):
"""
Helper function to determine if a config file value should be overriden
by the user.
"""
if param_value == "auto":
try:
sys.stderr.write(
f"{param_name} set to {config[param_name]} based on config file.\n"
)
except KeyError:
raise KeyError(
f"{param_name} is not defined in the config. Please add it to "
f"the JSON config file, or provide it as a command line option."
)
else:
try:
config[param_name] = param_type(param_value)
except ValueError:
raise ValueError(
f'Error: {param_name} must be a {param_type} or "auto"'
)
return config
def _quantify_sub_mat(data):
"""
This function is dispatched by multiprocessing to
run the quantification pipeline.
"""
sub = data[0][1]
config = data[1]
kernel = data[2]
positions = data[3]
# If there is no pattern on this sub matrix,
# do not scan it.
if positions.shape[0]:
sub.contact_map.create_mat()
# Feed the submatrix to quantification pipeline
patterns, windows = cid.pattern_detector(
sub.contact_map,
config,
kernel,
coords=np.array(positions.loc[:, ["bin1", "bin2"]]),
full=True,
tsvd=config["tsvd"],
)
sub.contact_map.destroy_mat()
else:
patterns = windows = None
return {
"coords": patterns,
"windows": windows,
"chr1": sub.chr1,
"chr2": sub.chr2,
}
def _get_chrom_pos(positions, hic_genome, chr1, chr2):
"""
Helper function to filter input 2D genomic positions
and convert them from whole genome to submatrix coords.
"""
# Filter patterns falling onto this sub-matrix
sub_pat = positions.loc[
(positions.chrom1 == chr1) & (positions.chrom2 == chr2)
]
# Convert genomic coordinates to bins for horizontal and vertical axes
for ax in [1, 2]:
sub_pat_ax = sub_pat.loc[:, [f"chrom{ax}", f"pos{ax}"]].rename(
columns={f"chrom{ax}": "chrom", f"pos{ax}": "pos"}
)
sub_pat_bins = hic_genome.coords_to_bins(sub_pat_ax)
sub_pat[f"bin{ax}"] = sub_pat_bins
# Check for nan bins (coords that do not match any Hi-C fragments
fall_out = np.isnan(sub_pat["bin1"]) | np.isnan(sub_pat["bin2"])
if np.any(fall_out):
n_out = len(sub_pat_bins[fall_out])
sys.stderr.write(
f"\n{n_out} entr{'ies' if n_out > 1 else 'y'} outside "
"genomic coordinates of the Hi-C matrix will be ignored.\n"
)
sub_pat = sub_pat.loc[~fall_out, :]
sub_pat_idx = sub_pat.index.values
# Convert bins from whole genome matrix to sub matrix
sub_pat = hic_genome.get_sub_mat_pattern(chr1, chr2, sub_pat)
return sub_pat_idx, sub_pat
[docs]def cmd_quantify(args):
bed2d_path = args["<bed2d>"]
mat_path = args["<contact_map>"]
prefix = args["<prefix>"]
n_mads = float(args["--n-mads"])
pattern = args["--pattern"]
inter = args["--inter"]
kernel_config_path = args["--kernel-config"]
perc_zero = args["--perc-zero"]
perc_undetected = args["--perc-undetected"]
plotting_enabled = False if args["--no-plotting"] else True
threads = int(args["--threads"])
norm = args["--norm"]
tsvd = 0.999 if args["--tsvd"] else None
win_fmt = args["--win-fmt"]
if win_fmt not in ["npy", "json"]:
sys.stderr.write("Error: --win-fmt must be either json or npy.\n")
sys.exit(1)
win_size = args["--win-size"]
if win_size != "auto":
win_size = int(win_size)
subsample = args["--subsample"]
# If prefix involves a directory, crash if it does not exist
cio.check_prefix_dir(prefix)
# Load 6 cols from 2D BED file and infer header
bed2d = cio.load_bed2d(bed2d_path)
# Warn user if --inter is disabled but list contains inter patterns
if not inter and len(bed2d.start1[bed2d.chrom1 != bed2d.chrom2]) > 0:
sys.stderr.write(
"Warning: The bed2d file contains interchromosomal patterns. "
"These patterns will not be scanned unless --inter is used.\n"
)
if kernel_config_path is not None:
custom = True
# Loading input path as config
config_path = kernel_config_path
else:
custom = False
# Will use a preset config file matching pattern name
config_path = pattern
cfg = cio.load_kernel_config(config_path, custom)
# Subsample Hi-C contacts from the matrix, if requested
if subsample == "no":
subsample = None
# Instantiate and preprocess contact map
hic_genome = HicGenome(
mat_path, inter=inter, kernel_config=cfg, sample=subsample
)
# enforce max scanning distance to pattern at longest distance
furthest = np.max(bed2d.start2 - bed2d.start1)
max_diag = hic_genome.clr.shape[0] * hic_genome.clr.binsize
cfg["max_dist"] = min(furthest, max_diag)
cfg["min_dist"] = 0
cfg["tsvd"] = tsvd
cfg = _override_kernel_config("max_perc_zero", perc_zero, float, cfg)
cfg = _override_kernel_config(
"max_perc_undetected", perc_undetected, float, cfg
)
# Notify contact map instance of changes in scanning distance
hic_genome.kernel_config = cfg
# Normalize (balance) matrix using ICE
hic_genome.normalize(norm=norm, n_mads=n_mads, threads=threads)
# Initialize output structures
bed2d["score"] = np.nan
bed2d["pvalue"] = np.nan
positions = bed2d.copy()
# Only resize kernel matrix if explicitely requested
km, kn = cfg["kernels"][0].shape
n_kernels = len(cfg['kernels'])
if win_size != "auto":
if not win_size % 2:
raise ValueError("--win-size must be odd")
for i, k in enumerate(cfg["kernels"]):
cfg["kernels"][i] = resize_kernel(k, factor=win_size / km)
km = kn = win_size
# Update kernel config after resizing kernels
hic_genome.kernel_config = cfg
# Define how many diagonals should be used in intra-matrices
hic_genome.compute_max_dist()
# Split whole genome matrix into intra- and inter- sub matrices. Each sub
# matrix is processed on the fly (obs / exp, trimming diagonals > max dist)
hic_genome.make_sub_matrices()
windows = np.full((positions.shape[0], km, kn), np.nan)
# We will store a copy of coordinates for each kernel
bed2d_out = [bed2d.copy() for _ in range(n_kernels)]
windows_out = [windows.copy() for _ in range(n_kernels)]
# For each position, we use the center of the BED interval
positions["pos1"] = (positions.start1 + positions.end1) // 2
positions["pos2"] = (positions.start2 + positions.end2) // 2
# Use each kernel matrix available for the pattern
for kernel_id, kernel_matrix in enumerate(cfg["kernels"]):
cio.progress(kernel_id, len(cfg["kernels"]), f"Kernel: {kernel_id}\n")
n_sub_mats = hic_genome.sub_mats.shape[0]
# Retrieve input positions for each submatrix and convert
# coordinates from whole genome to submatrix.
sub_pos = [
_get_chrom_pos(positions, hic_genome, m[1].chr1, m[1].chr2)
for m in hic_genome.sub_mats.iterrows()
]
# Apply quantification procedure to all sub matrices in parallel
sub_mat_data = zip(
hic_genome.sub_mats.iterrows(),
[cfg for _ in range(n_sub_mats)],
[kernel_matrix for _ in range(n_sub_mats)],
[s[1] for s in sub_pos],
)
# Run quantification in parallel on different sub matrices,
# and show progress when gathering results
sub_mat_results = []
# Run in multiprocessing subprocesses
if threads > 1:
pool = mp.Pool(threads)
dispatcher = pool.imap(_quantify_sub_mat, sub_mat_data, 1)
else:
dispatcher = map(_quantify_sub_mat, sub_mat_data)
for s, result in enumerate(dispatcher):
cio.progress(s, n_sub_mats, f"{result['chr1']}-{result['chr2']}")
sub_mat_results.append(result)
for i, r in enumerate(sub_mat_results):
# If there were no patterns on that sub matrix, just skip it
if r['coords'] is None:
continue
sub_pat_idx = sub_pos[i][0]
# For each coordinate, keep the highest coefficient
# among all kernels.
try:
bed2d_out[kernel_id]['score'][sub_pat_idx] = r['coords'].score.values
bed2d_out[kernel_id]["pvalue"][sub_pat_idx] = r["coords"].pvalue.values
windows_out[kernel_id][sub_pat_idx, :, :] = r["windows"]
# Do nothing if no pattern was detected or matrix
# is smaller than the kernel (-> patterns is None)
except AttributeError:
pass
# Select the best score for each coordinate (among the different kernels)
bed2d = pd.concat(bed2d_out, axis=0).reset_index(drop=True)
windows = np.concatenate(windows_out, axis=0)
bed2d = (
bed2d
.sort_values('score', ascending=True)
.groupby(['chrom1', 'start1', 'chrom2', 'start2'], sort=False)
.tail(1)
)
windows = windows[bed2d.index, :, :]
bed2d = bed2d.reset_index(drop=True)
bed2d["bin1"] = hic_genome.coords_to_bins(
bed2d.loc[:, ["chrom1", "start1"]].rename(
columns={"chrom1": "chrom", "start1": "pos"}
)
)
bed2d["bin2"] = hic_genome.coords_to_bins(
bed2d.loc[:, ["chrom2", "start2"]].rename(
columns={"chrom2": "chrom", "start2": "pos"}
)
)
bed2d["qvalue"] = fdr_correction(bed2d["pvalue"])
bed2d = bed2d.loc[
:,
[
"chrom1",
"start1",
"end1",
"chrom2",
"start2",
"end2",
"bin1",
"bin2",
"score",
"pvalue",
"qvalue",
],
]
# Set p-values of invalid scores to nan
bed2d.loc[np.isnan(bed2d.score), "pvalue"] = np.nan
bed2d.loc[np.isnan(bed2d.score), "qvalue"] = np.nan
# Sort by whole genome coordinates to match input order
bed2d = (
bed2d
.sort_values(['bin1', 'bin2'], ascending=True)
.reset_index(drop=True)
)
cio.write_patterns(bed2d, prefix)
cio.save_windows(windows, prefix, fmt=win_fmt)
# Generate pileup visualisations if requested
if plotting_enabled:
# Compute and plot pileup
pileup_title = ("pileup_of_{n}_{pattern}").format(
pattern=cfg["name"], n=windows.shape[0]
)
windows_pileup = cid.pileup_patterns(windows)
# Symmetrize pileup for diagonal patterns
if not cfg["max_dist"]:
# Replace nan below diag by 0
windows_pileup = np.nan_to_num(windows_pileup)
# Add transpose
windows_pileup += np.transpose(windows_pileup) - np.diag(
np.diag(windows_pileup)
)
sys.stderr.write(f"Saving pileup plots in {prefix}.pdf\n")
pileup_plot(windows_pileup, prefix, name=pileup_title)
[docs]def cmd_generate_config(args):
# Parse command line args for generate_config
prefix = args["<prefix>"]
pattern = args["--preset"]
click_find = args["--click"]
n_mads = float(args["--n-mads"])
norm = args["--norm"]
win_size = args["--win-size"]
threads = int(args["--threads"])
inter = args["--inter"]
chroms = args["--chroms"]
cfg = cio.load_kernel_config(pattern, False)
# If prefix involves a directory, crash if it does not exist
cio.check_prefix_dir(prefix)
# If a specific window size if requested, resize all kernels
if win_size != "auto":
win_size = int(win_size)
if not win_size % 2:
raise ValueError("--win-size must be odd")
resize = lambda m: resize_kernel(m, factor=win_size / m.shape[0])
cfg["kernels"] = [resize(k) for k in cfg["kernels"]]
# Otherwise, just inherit window size from the kernel config
else:
win_size = cfg["kernels"][0].shape[0]
# If click mode is enabled, build a kernel from scratch using
# graphical display, otherwise, just inherit the pattern's kernel
if click_find:
hic_genome = HicGenome(click_find, inter=inter, kernel_config=cfg)
# Normalize (balance) the whole genome matrix
hic_genome.normalize(norm=norm, n_mads=n_mads, threads=threads)
# enforce full scanning distance in kernel config
hic_genome.max_dist = hic_genome.clr.shape[0] * hic_genome.clr.binsize
# Process each sub-matrix individually (detrend diag for intra)
hic_genome.make_sub_matrices()
# By default, the whole genome is showed at once (takes lots of RAM)
if chroms is None:
for sub in hic_genome.sub_mats.iterrows():
sub_mat = sub[1].contact_map
sub_mat.create_mat()
processed_mat = hic_genome.gather_sub_matrices().tocsr()
windows = click_finder(processed_mat, half_w=int((win_size - 1) / 2))
# If chromosomes were specified, their submatrices are shown one by one
# taking less memory (but more tedious for the user)
else:
chroms = chroms.split(',')
# Generate chromosome pairs to scan
if inter:
chroms = it.combinations_with_replacement(chroms, 2)
else:
chroms = [(ch, ch) for ch in chroms]
windows = []
for c1, c2 in chroms:
try:
sub_mat = hic_genome.sub_mats.query(
'(chr1 == @c1) & (chr2 == @c2)'
)['contact_map'].values[0]
# In case chromosomes have been entered in a different order
except IndexError:
c1, c2 = c2, c1
sub_mat = hic_genome.sub_mats.query(
'(chr1 == @c1) & (chr2 == @c2)'
)['contact_map'].values[0]
sub_mat.create_mat()
chrom_wins = click_finder(
sub_mat.matrix.tocsr(),
half_w=int((win_size - 1) / 2),
xlab=c2,
ylab=c1
)
windows.append(chrom_wins)
sub_mat.destroy_mat()
windows = np.concatenate(windows, axis=0)
# Pileup all recorded windows and convert to JSON serializable list
pileup = ndi.gaussian_filter(cid.pileup_patterns(windows), 1)
cfg["kernels"] = [pileup.tolist()]
# Show the newly generate kernel to the user, use zscore to highlight contrast
hm = plt.imshow(
np.log(pileup), vmax=np.percentile(pileup, 99), cmap="afmhot_r"
)
cbar = plt.colorbar(hm)
cbar.set_label("Log10 Hi-C contacts")
plt.title("Manually generated kernel")
plt.show()
# Write kernel matrices to files with input prefix and replace kernels
# by their path in config
for mat_id, mat in enumerate(cfg["kernels"]):
mat_path = f"{prefix}.{mat_id+1}.txt"
np.savetxt(mat_path, mat)
cfg["kernels"][mat_id] = mat_path
# Write config to JSON file using prefix
with open(f"{prefix}.json", "w") as config_handle:
json.dump(cfg, config_handle, indent=4)
def _detect_sub_mat(data):
sub = data[0][1]
config = data[1]
kernel = data[2]
dump = data[3]
sub.contact_map.create_mat()
chrom_patterns, chrom_windows = cid.pattern_detector(
sub.contact_map,
config,
kernel,
dump=dump,
full=True,
tsvd=config["tsvd"],
)
sub.contact_map.destroy_mat()
return {
"coords": chrom_patterns,
"windows": chrom_windows,
"chr1": sub.chr1,
"chr2": sub.chr2,
}
[docs]def cmd_detect(args):
# Parse command line arguments for detect
dump = args["--dump"]
norm = args["--norm"]
interchrom = args["--inter"]
iterations = args["--iterations"]
kernel_config_path = args["--kernel-config"]
mat_path = args["<contact_map>"]
max_dist = args["--max-dist"]
min_dist = args["--min-dist"]
min_separation = args["--min-separation"]
n_mads = float(args["--n-mads"])
prefix = args["<prefix>"]
pattern = args["--pattern"]
pearson = args["--pearson"]
perc_zero = args["--perc-zero"]
perc_undetected = args["--perc-undetected"]
subsample = args["--subsample"]
threads = int(args["--threads"])
tsvd = 0.999 if args["--tsvd"] else None
win_fmt = args["--win-fmt"]
win_size = args["--win-size"]
if subsample == "no":
subsample = None
plotting_enabled = False if args["--no-plotting"] else True
smooth_trend = args["--smooth-trend"]
if smooth_trend is None:
smooth_trend = False
# If prefix involves a directory, crash if it does not exist
cio.check_prefix_dir(prefix)
if win_fmt not in ["npy", "json"]:
sys.stderr.write("Error: --win-fmt must be either json or npy.\n")
sys.exit(1)
# Read a user-provided kernel config if custom is true
# Else, load a preset kernel config for input pattern
# Configs are JSON files containing all parameter associated with the pattern
# They are loaded into a dictionary in the form :
# {"max_iterations": 3, "kernels": [kernel1, kernel2, ...], ...}
# Where each kernel is a 2D numpy array representing the pattern
if kernel_config_path is not None:
custom = True
# Loading input path as config
config_path = kernel_config_path
else:
custom = False
# Will use a preset config file matching pattern name
config_path = pattern
### 0: LOAD INPUT
params = {
"max_iterations": (iterations, int),
"pearson": (pearson, float),
"max_dist": (max_dist, int),
"min_dist": (min_dist, int),
"min_separation": (min_separation, int),
"max_perc_undetected": (perc_undetected, float),
"max_perc_zero": (perc_zero, float),
}
cfg = cio.load_kernel_config(config_path, custom)
for param_name, (param_value, param_type) in params.items():
cfg = _override_kernel_config(param_name, param_value, param_type, cfg)
# Resize kernels if requested
if win_size != "auto":
win_size = int(win_size)
if not win_size % 2:
raise ValueError("--win-size must be odd")
resize = lambda m: resize_kernel(m, factor=win_size / m.shape[0])
cfg["kernels"] = [resize(k) for k in cfg["kernels"]]
if interchrom:
sys.stderr.write(
"WARNING: Detection on interchromosomal matrices is expensive in RAM\n"
)
hic_genome = HicGenome(
mat_path,
inter=interchrom,
kernel_config=cfg,
dump=dump,
smooth=smooth_trend,
sample=subsample,
)
### 1: Process input signal
hic_genome.kernel_config = cfg
# Normalize (balance) matrix using ICE
hic_genome.normalize(norm=norm, n_mads=n_mads, threads=threads)
# Define how many diagonals should be used in intra-matrices
hic_genome.compute_max_dist()
# Split whole genome matrix into intra- and inter- sub matrices. Each sub
# matrix is processed on the fly (obs / exp, trimming diagonals > max dist)
hic_genome.make_sub_matrices()
all_coords = []
all_windows = []
### 2: DETECTION ON EACH SUBMATRIX
n_sub_mats = hic_genome.sub_mats.shape[0]
# Loop over the different kernel matrices for input pattern
run_id = 0
# Use cfg to inform jobs whether they should run full convolution
cfg["tsvd"] = tsvd
total_runs = len(cfg["kernels"]) * cfg["max_iterations"]
sys.stderr.write("Detecting patterns...\n")
for kernel_id, kernel_matrix in enumerate(cfg["kernels"]):
# Adjust kernel iteratively
for i in range(cfg["max_iterations"]):
cio.progress(
run_id, total_runs, f"Kernel: {kernel_id}, Iteration: {i}\n"
)
# Apply detection procedure to all sub matrices in parallel
sub_mat_data = zip(
hic_genome.sub_mats.iterrows(),
[cfg for i in range(n_sub_mats)],
[kernel_matrix for i in range(n_sub_mats)],
[dump for i in range(n_sub_mats)],
)
# Run detection in parallel on different sub matrices, and show progress when
# gathering results
sub_mat_results = []
# Run in multiprocessing subprocesses
if threads > 1:
pool = mp.Pool(threads)
dispatcher = pool.imap(_detect_sub_mat, sub_mat_data, 1)
else:
dispatcher = map(_detect_sub_mat, sub_mat_data)
for s, result in enumerate(dispatcher):
cio.progress(s, n_sub_mats, f"{result['chr1']}-{result['chr2']}")
sub_mat_results.append(result)
# Convert coordinates from chromosome to whole genome bins
kernel_coords = [
hic_genome.get_full_mat_pattern(
d["chr1"], d["chr2"], d["coords"]
)
for d in sub_mat_results
if d["coords"] is not None
]
# Gather newly detected pattern coordinates
try:
# Extract surrounding windows for each sub_matrix
kernel_windows = np.concatenate(
[
w["windows"]
for w in sub_mat_results
if w["windows"] is not None
],
axis=0,
)
all_coords.append(
pd.concat(kernel_coords, axis=0).reset_index(drop=True)
)
# Add info about kernel and iteration which detected these patterns
all_coords[-1]["kernel_id"] = kernel_id
all_coords[-1]["iteration"] = i
all_windows.append(kernel_windows)
# If no pattern was found with this kernel
# skip directly to the next one, skipping iterations
except ValueError:
break
# Update kernel with patterns detected at current iteration
kernel_matrix = cid.pileup_patterns(kernel_windows)
run_id += 1
cio.progress(run_id, total_runs, f"Kernel: {kernel_id}, Iteration: {i}\n")
# If no pattern detected on any chromosome, with any kernel, exit gracefully
if len(all_coords) == 0:
sys.stderr.write("No pattern detected ! Exiting.\n")
sys.exit(0)
# Finish parallelized part
if threads > 1:
pool.close()
# Combine patterns of all kernel matrices into a single array
all_coords = pd.concat(all_coords, axis=0).reset_index(drop=True)
# Combine all windows from different kernels into a single pile of windows
all_windows = np.concatenate(all_windows, axis=0)
# Compute minimum separation in bins and make sure it has a reasonable value
separation_bins = int(cfg["min_separation"] // hic_genome.clr.binsize)
if separation_bins < 1:
separation_bins = 1
print(f"Minimum pattern separation is : {separation_bins}")
# Remove patterns with overlapping windows (smeared patterns)
distinct_patterns = cid.remove_neighbours(
all_coords, win_size=separation_bins
)
# Drop patterns that are too close to each other
all_coords = all_coords.loc[distinct_patterns, :]
all_windows = all_windows[distinct_patterns, :, :]
# Get from bins into basepair coordinates
coords_1 = hic_genome.bins_to_coords(all_coords.bin1).reset_index(
drop=True
)
coords_1.columns = [str(col) + "1" for col in coords_1.columns]
coords_2 = hic_genome.bins_to_coords(all_coords.bin2).reset_index(
drop=True
)
coords_2.columns = [str(col) + "2" for col in coords_2.columns]
all_coords = pd.concat(
[all_coords.reset_index(drop=True), coords_1, coords_2], axis=1
)
# Filter patterns closer than minimum distance from the diagonal if any
min_dist_drop_mask = (all_coords.chrom1 == all_coords.chrom2) & (
np.abs(all_coords.start2 - all_coords.start1) < cfg["min_dist"]
)
all_coords = all_coords.loc[~min_dist_drop_mask, :]
all_windows = all_windows[~min_dist_drop_mask, :, :]
del min_dist_drop_mask
# Remove patterns with nan p-values (no contact in window)
pval_mask = all_coords.pvalue.isnull()
all_coords = all_coords.loc[~pval_mask, :]
all_windows = all_windows[~pval_mask, :, :]
del pval_mask
# Correct p-values for multiple testing using FDR
all_coords["qvalue"] = fdr_correction(all_coords["pvalue"])
# Reorder columns
all_coords = all_coords.loc[
:,
[
"chrom1",
"start1",
"end1",
"chrom2",
"start2",
"end2",
"bin1",
"bin2",
"kernel_id",
"iteration",
"score",
"pvalue",
"qvalue",
],
]
### 3: WRITE OUTPUT
sys.stderr.write(f"{all_coords.shape[0]} patterns detected\n")
# Save patterns and their coordinates in a tsv file
sys.stderr.write(f"Saving patterns in {prefix}.tsv\n")
cio.write_patterns(all_coords, prefix)
# Save windows as an array in an npy file
sys.stderr.write(f"Saving patterns in {prefix}.{win_fmt}\n")
cio.save_windows(all_windows, prefix, fmt=win_fmt)
# Generate pileup visualisations if requested
if plotting_enabled:
# Compute and plot pileup
pileup_title = ("Pileup of {n} {pattern}").format(
pattern=cfg["name"], n=all_windows.shape[0]
)
windows_pileup = cid.pileup_patterns(all_windows)
# Symmetrize pileup for diagonal patterns
if not cfg["max_dist"]:
# Replace nan below diag by 0
windows_pileup = np.nan_to_num(windows_pileup)
# Add transpose
windows_pileup += np.transpose(windows_pileup) - np.diag(
np.diag(windows_pileup)
)
sys.stderr.write(f"Saving pileup plots in {prefix}.pdf\n")
pileup_plot(windows_pileup, prefix, name=pileup_title)
[docs]def cmd_list_kernels(args):
kernel_name = args["--name"]
# Load every avaiable kernel by default
if kernel_name == "all":
kernels = ck.kernel_names
# If a specific kernel was requested, only load this one
else:
kernels = [kernel_name]
# Check availability of each kernel and print its name
for k in kernels:
try:
kernel_infos = getattr(ck, k)
except AttributeError:
raise ValueError(f"Kernel {k} is not available")
print(k)
# Print default params if --long specified (key-value pairs in json)
if args["--long"]:
exclude_params = ["name", "resolution", "kernels"]
for param, value in kernel_infos.items():
if param not in exclude_params:
print(f" {param}: {value}")
if args["--mat"]:
mats = kernel_infos["kernels"]
for mat in mats:
print_ascii_mat(mat)
[docs]def cmd_test(args):
sys.stderr.write(f"Fetching test dataset at {URL_EXAMPLE_DATASET}...\n")
tmp_cool = tempfile.NamedTemporaryFile(delete=False)
cio.download_file(URL_EXAMPLE_DATASET, tmp_cool.name)
sys.stderr.write(f"Running detection on test dataset...\n")
args["<contact_map>"] = tmp_cool.name
args["<prefix>"] = "chromosight_test"
args["--no-plotting"] = True
cmd_detect(args)
os.unlink(tmp_cool.name)
[docs]@contextmanager
def capture_ouput(stderr_to=None):
"""Capture the stderr of the test run. """
try:
stderr = sys.stderr
sys.stderr = c2 = stderr_to or io.StringIO()
yield c2
finally:
sys.stderr = stderr
try:
c2.flush()
c2.seek(0)
except (ValueError, IOError):
pass
[docs]def logo_version(logo, ver):
small_logo = resize_kernel(logo, factor=0.33, quiet=True)
ascii_logo = print_ascii_mat(small_logo, colored=False, print_str=False)
return f"{ascii_logo} Chromosight version {ver}"
[docs]def main():
args = docopt.docopt(__doc__, version=logo_version(LOGO, __version__))
detect = args["detect"]
generate_config = args["generate-config"]
list_kernels = args["list-kernels"]
quantify = args["quantify"]
test = args["test"]
if test:
with capture_ouput() as stderr:
cmd_test(args)
obs_log = stderr.read()
sys.stderr.write(obs_log)
# remove progress bars and \r chars
obs_log_lines = {
u.strip("\x1b[K")
for u in set(obs_log.split("\n"))
if "\r" not in u
}
exp_log_lines = set(TEST_LOG.split("\n"))
if len(exp_log_lines ^ obs_log_lines):
sys.stderr.write(
"\nWarning, the test log differed from the "
"expected one. This means the program changed its output from"
"previous versions. You may ignore this if you are not a "
"developer.\n\n"
f"Here is the expected log:\n\n{TEST_LOG}\n"
)
elif detect:
cmd_detect(args)
elif generate_config:
cmd_generate_config(args)
elif list_kernels:
cmd_list_kernels(args)
elif quantify:
cmd_quantify(args)
return 0
if __name__ == "__main__":
main()