Source code for trackc.tl._getRegionsCmat
import logging
from typing import List, Sequence, Union
import cooler
import numpy as np
import pandas as pd
[docs]class GenomeRegion:
region = None
chrom = None
start = None
end = None
fetch_start = None
fetch_end = None
length = None
isReverse = False
[docs] def __init__(self, region: str):
self.region = region
tmp = region.split(":")
self.chrom = tmp[0]
if len(tmp) == 2:
self.start = int(tmp[1].split("-")[0])
self.end = int(tmp[1].split("-")[1])
if self.start > self.end:
self.isReverse = True
self.fetch_start = self.end
self.fetch_end = self.start
else:
self.fetch_start = self.start
self.fetch_end = self.end
self.length = abs(self.start - self.end)
def fetchRegion(self):
fetch_r = self.chrom
if self.fetch_start != None:
fetch_r = fetch_r + ":" + str(self.fetch_start) + "-" + str(self.fetch_end)
return fetch_r
def GenomeRegion2df(self):
"""
region4coolFetch = self.chrom + ":" + str(self.start) + '-' + str(self.end)
if self.start == None:
region4coolFetch = self.chrom
else:
if self.start > self.end:
region4coolFetch = self.chrom + ":" + str(self.end) + '-' + str(self.start)
"""
region4coolFetch = self.fetchRegion()
df = pd.DataFrame(
{
"chrom": [self.chrom],
"start": [self.start],
"end": [self.end],
"isReverse": [self.isReverse],
"fetch_start": [self.fetch_start],
"fetch_end": [self.fetch_end],
"region4coolFetch": [region4coolFetch],
},
index=[self.region],
)
return df
[docs]class RegionsCmat:
[docs] def __init__(
self, cmat: np.array, row_regions: pd.DataFrame, col_regions: pd.DataFrame
):
self.cmat = cmat
self.row_regions = row_regions
self.col_regions = col_regions
# import extractContactRegions as subsetContactRegions
[docs]def extractContactRegions(
clr: Union[cooler.Cooler, str],
balance: bool = False,
# divisive_weights = None,
row_regions: Union[Sequence[str], str, None] = None,
col_regions: Union[Sequence[str], str, None] = None,
) -> RegionsCmat:
"""
Extract a set of regions matrix from the cool format Hi-C matrix.
The extracted matrix will splice intra and inter region interaction according to
the given order and direction of the regions.
Parameters
----------
clr: `cooler.Cooler` | `str`
cool format Hi-C matrix (https://github.com/open2c/cooler) or cool file path
`cooler.Cooler` e.g. GM12878=cooler.Cooler('./GM12878.chr18.mcool::/resolutions/50000')
`str` e.g. 'GM12878.chr18.mcool::/resolutions/50000' or 'GM12878.chr18.cool'
balance: `bool`
The ``'balance'`` parameters of ``coolMat.matrix(balance=False).fetch('chr6:119940450-123940450')``
row_regions: `str` | `str list` | None.
The subset matrix row genome regions.
e.g. ``"chr6:1000000-2000000"``, e.g. ``["chr6:1000000-2000000", "chr3:5000000-4000000", "chr5"]``
The start can be larger than the end (e.g. ``"chr6:2000000-1000000"``),
which means you want to get the reverse region contact matrix
col_regions: `str` | `str list` | None.
The subset matrix col genome regions, default is ``None``, which means the sample region as ``row_regions``
Returns:
--------
:class:`~trackc.tl.RegionsCmat`
row_regions and col_regions contact matrix object
Example
-------
>>> import trackc as tc
>>> import cooler
>>> mat1 = tc.tl.extractContactRegions(clr='GM12878.chr18.mcool::/resolutions/50000', row_regions="18:45000000-78077248")
>>> GM12878 = cooler.Cooler('./GM12878.chr18.mcool::/resolutions/50000')
>>> mat2 = tc.tl.extractContactRegions(clr=GM12878, row_regions=["18:61140000-63630000", "18:74030000-77560000"], col_regions="18:47340000-50370000")
>>> print(mat2.cmap.shape)
>>> print(mat2.row_regions)
>>> print(mat2.col_regions)
"""
# divisive_weights: bool, optional
# Force balancing weights to be interpreted as divisive (True) or
# multiplicative (False). Weights are always assumed to be
# multiplicative by default unless named KR, VC or SQRT_VC, in which
# case they are assumed to be divisive by default.
# -------
if isinstance(row_regions, list):
row_GenomeRegions = pd.concat(
[GenomeRegion(i).GenomeRegion2df() for i in row_regions]
)
else:
row_GenomeRegions = GenomeRegion(row_regions).GenomeRegion2df()
if col_regions == None:
col_GenomeRegions = row_GenomeRegions.copy()
else:
if isinstance(col_regions, list):
col_GenomeRegions = pd.concat(
[GenomeRegion(i).GenomeRegion2df() for i in col_regions]
)
else:
col_GenomeRegions = GenomeRegion(col_regions).GenomeRegion2df()
# ------
if isinstance(clr, str):
clr = cooler.Cooler(clr)
region_mat_dic = {}
for _, row_row in row_GenomeRegions.iterrows():
for _, col_row in col_GenomeRegions.iterrows():
row_col_region_cmat = clr.matrix(balance=balance).fetch(
row_row["region4coolFetch"], col_row["region4coolFetch"]
)
if row_row["isReverse"] == True:
row_col_region_cmat = np.flip(row_col_region_cmat, 0)
if col_row["isReverse"] == True:
row_col_region_cmat = np.flip(row_col_region_cmat, 1)
region_mat_dic[
"{0}__{1}".format(
row_row["region4coolFetch"], col_row["region4coolFetch"]
)
] = row_col_region_cmat
vstack_list = [None] * row_GenomeRegions.shape[0]
hstack_list = [None] * col_GenomeRegions.shape[0]
for i, row_region in enumerate(row_GenomeRegions["region4coolFetch"].to_list()):
for ii, col_region in enumerate(
col_GenomeRegions["region4coolFetch"].to_list()
):
hstack_list[ii] = region_mat_dic["{0}__{1}".format(row_region, col_region)]
vstack_list[i] = np.hstack(tuple(hstack_list))
cMat = np.vstack(tuple(vstack_list))
####
row_GenomeRegions["cbins"] = 0
col_GenomeRegions["cbins"] = 0
row_index = row_GenomeRegions.index.to_list()
col_index = col_GenomeRegions.index.to_list()
# print(row_index)
# print(col_index)
for i in row_index:
for ii in col_index:
cmat_shape = region_mat_dic[
"{0}__{1}".format(
row_GenomeRegions.loc[i, "region4coolFetch"],
col_GenomeRegions.loc[ii, "region4coolFetch"],
)
].shape
row_GenomeRegions.loc[i, "cbins"] = cmat_shape[0]
col_GenomeRegions.loc[ii, "cbins"] = cmat_shape[1]
# print(col_GenomeRegions)
row_GenomeRegions["chrom"] = row_GenomeRegions["chrom"].astype(str)
col_GenomeRegions["chrom"] = col_GenomeRegions["chrom"].astype(str)
# row_GenomeRegions['fetch_start'] = row_GenomeRegions['fetch_start'].astype(int)
# col_GenomeRegions['fetch_start'] = col_GenomeRegions['fetch_start'].astype(int)
r_l_regions_cMat = RegionsCmat(
cmat=cMat, row_regions=row_GenomeRegions, col_regions=col_GenomeRegions
)
return r_l_regions_cMat
# import extractCisRegion as subsetCisRegion
[docs]def extractCisContact(
clr: Union[cooler.Cooler, str],
region: str,
extend: int = 0,
balance: bool = False,
# divisive_weights = None,
) -> np.array:
"""
Extract cis contact matrix from the cool or mcool format Hi-C matrix.
Parameters
----------
clr: `cooler.Cooler` | `str`
cool format Hi-C matrix (https://github.com/open2c/cooler) or cool file path
`cooler.Cooler` e.g. GM12878=cooler.Cooler('./GM12878.chr18.mcool::/resolutions/50000')
`str` e.g. 'GM12878.chr18.mcool::/resolutions/50000' or 'GM12878.chr18.cool'
region: `str`
The subset matrix row genome regions
eg. ``"chr6:1000000-2000000"`` or ``chr6``
extend: `int`
contact map extend to start and end position
balance: `bool`
The ``'balance'`` parameters of ``coolMat.matrix(balance=False).fetch('chr6:119940450-123940450')``
Returns:
--------
contact matrix: np.array
matrix start with top-left
Example
-------
>>> import trackc as tc
>>> import cooler
>>> mat1 = tc.tl.extractCisContact(clr='GM12878.chr18.mcool::/resolutions/50000', region="18:45000000-78077248")
>>> GM12878 = cooler.Cooler('./GM12878.chr18.mcool::/resolutions/50000')
>>> mat2 = tc.tl.extractCisContact(clr=GM12878, region="18:45000000-78077248")
>>> print(mat2.shape)
"""
if isinstance(clr, str):
clr = cooler.Cooler(clr)
resolution = clr.binsize
genome_region = GenomeRegion(region)
if genome_region.chrom not in clr.chromsizes:
print(genome_region.chrom, " is not a chrom in the cool matrix")
if genome_region.chrom.startswith("chr"):
genome_region.chrom = genome_region.chrom.lstrip("chr")
else:
genome_region.chrom = "chr" + genome_region.chrom
print(f"{genome_region.chrom} instead")
maxChromL = clr.chromsizes[genome_region.chrom]
# ------
if genome_region.fetch_start != None:
genome_region.fetch_start = genome_region.fetch_start - extend * resolution
if genome_region.fetch_start < 0:
# genome_region.fetch_start = 0
logging.error("Error: Extend start is less than 0, set extend=0 is ok\n")
# sys.exit(1)
genome_region.fetch_end = genome_region.fetch_end + extend * resolution
if genome_region.fetch_end > maxChromL:
# genome_region.fetch_end = maxChromL
logging.error(
"Error: extend is larger than chrom length, set extend=0 is ok\n"
)
# sys.exit(1)
df = clr.matrix(balance=balance).fetch(genome_region.fetchRegion())
return df