import ndex2
from obnb.data.base import BaseData
from obnb.graph import SparseGraph
from obnb.typing import Any, Dict, List, Mapping, Optional, Union
from obnb.util.download import download_unzip
from obnb.util.logger import display_pbar
[docs]class BaseNDExData(BaseData, SparseGraph):
"""The BaseNdexData object for retrieving networks from NDEX.
www.ndexbio.org
"""
CONFIG_KEYS: List[str] = BaseData.CONFIG_KEYS + [
"cx_uuid",
"weighted",
"directed",
"largest_comp",
"cx_kwargs",
]
uuid: Optional[str] = None
def __init__(
self,
root: str,
weighted: bool,
directed: bool,
largest_comp: bool = False,
gene_id_converter: Optional[Union[Mapping[str, str], str]] = "HumanEntrez",
cx_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
):
"""Initialize the BaseNdexData object.
Args:
root (str): The root directory of the data.
weighted (bool): Whether the network is weighted or not.
directed (bool): Whether the network is directed or not.
largest_comp (bool): If set to True, then only take the largest
connected component of the graph.
cx_kwargs: Keyword arguments used for reading the cx file.
"""
self.largest_comp = largest_comp
self.cx_kwargs: Dict[str, Any] = cx_kwargs or {}
super().__init__(
root,
weighted=weighted,
directed=directed,
gene_id_converter=gene_id_converter,
**kwargs,
)
@property
def raw_files(self) -> List[str]:
return ["data.cx"]
@property
def processed_files(self) -> List[str]:
return ["data.npz"]
[docs] def download(self):
"""Download data from NDEX via ndex2 client."""
self.plogger.info(f"Retrieve NDEx network with uuid: {self.cx_uuid}")
client = ndex2.client.Ndex2()
client_resp = client.get_network_as_cx_stream(self.cx_uuid)
with open(self.raw_file_path(0), "wb") as f:
f.write(client_resp.content)
[docs] def process(self):
"""Process data and save for later usage."""
self.plogger.info(f"Process raw file {self.raw_file_path(0)}")
cx_graph = SparseGraph(
weighted=self.weighted,
directed=self.directed,
logger=self.plogger,
)
cx_graph.read_cx_stream_file(
self.raw_file_path(0),
node_id_converter=self.get_gene_id_converter(),
**self.cx_kwargs,
)
if self.largest_comp:
cx_graph = cx_graph.largest_connected_subgraph()
cx_graph.save_npz(self.processed_file_path(0), self.weighted)
self.plogger.info(f"Saved processed file {self.processed_file_path(0)}")
[docs] def load_processed_data(self, path: Optional[str] = None):
"""Load processed network."""
path = path or self.processed_file_path(0)
self.plogger.info(f"Load processed file {path}")
self.read_npz(path) # FIX: make sure old data purged
class BaseURLSparseGraphData(BaseData, SparseGraph):
"""Base sparse graph object with data downloaded from URL.
Notes:
To set up a new instance, specify the following class attributes
- :attr:`url`: URL from which the data will be downloaed.
- :attr:`download_zip_type`: type of the zip file downloaded, `zip`
or `gzip` (default is `gzip`)
"""
CONFIG_KEYS: List[str] = BaseData.CONFIG_KEYS + [
"url",
"download_zip_type",
"weighted",
"directed",
"largest_comp",
]
url: Optional[str] = None
download_zip_type: str = "gzip"
def __init__(
self,
root: str,
weighted: bool,
directed: bool,
largest_comp: bool = False,
gene_id_converter: Optional[Union[Mapping[str, str], str]] = "HumanEntrez",
**kwargs,
):
"""Initialize the BaseURLSparseGraphData object.
Args:
root: The root directory of the data.
weighted: Whether the network is weighted or not.
directed: Whether the network is directed or not.
largest_comp: If set to True, then only take the largest connected
component of the graph.
"""
self.largest_comp = largest_comp
super().__init__(
root,
weighted=weighted,
directed=directed,
gene_id_converter=gene_id_converter,
**kwargs,
)
# TODO: add more flexibility to choice of raw_files (parse at init?)
@property
def raw_files(self) -> List[str]:
return ["data.txt"]
# TODO: add more flexibility to choice of processed_files (parse at init?)
@property
def processed_files(self) -> List[str]:
return ["data.npz"]
def download(self):
"""Download data from URL."""
download_unzip(
self.url,
self.raw_dir,
zip_type=self.download_zip_type,
# TODO: what if multiple files? e.g., split by tissues
rename=self.raw_files[0],
logger=self.plogger,
)
# TODO: add more flexibility to the types of raw network file to handle
def process(self):
"""Process data and save for later usage."""
raw_graph = SparseGraph(
weighted=self.weighted,
directed=self.directed,
logger=self.plogger,
)
raw_graph.read(
self.raw_file_path(0),
reader="edglst",
show_pbar=display_pbar(self.log_level),
)
if self.gene_id_converter is not None: # convert node identifiers
# TODO: refactor id conversion as graph transform
self.plogger.info("Start converting gene IDs.")
converter = self.get_gene_id_converter()
converter.query_bulk(raw_graph.node_ids)
graph = SparseGraph(
weighted=self.weighted,
directed=self.directed,
logger=self.plogger,
)
for node1, node2, weight in raw_graph.edge_gen():
cvrtd_node1, cvrtd_node2 = converter[node1], converter[node2]
if cvrtd_node1 is not None is not cvrtd_node2:
graph.add_edge(cvrtd_node1, cvrtd_node2, weight, reduction="max")
else: # use original graph if no id conversion is needed
graph = raw_graph
if self.largest_comp:
graph = graph.largest_connected_subgraph()
out_path = self.processed_file_path(0)
graph.save_npz(out_path, self.weighted)
self.plogger.info(f"Saved processed file {out_path}")
def load_processed_data(self, path: Optional[str] = None):
"""Load processed network."""
# TODO: what if multiple files? e.g., split by tissues
path = path or self.processed_file_path(0)
self.plogger.info(f"Load processed file {path}")
self.read_npz(path) # FIX: make sure old data purged