Source code for store

#!/usr/bin/python

import os
import logging
import json
import requests
from openkm3.dataclasses import *
from IPython.display import display, Markdown

logging.basicConfig(level=logging.INFO)


[docs]class KM3Store: """Class to interface to the KM3NeT Open Data Center""" def __init__(self, localfolder=".openkm3/"): self.localfolder = localfolder self.dataendpoint = "http://opendata.km3net.de/" self.catalog = {} self.manager = {} if not os.path.exists(self.localfolder): os.makedirs(self.localfolder) self._load_catalog() logging.info("Loaded catalog from data center.") else: catalog = self._load_localfile("catalog.js") if catalog is None: self._load_catalog() logging.info("Loaded catalog from data center.") else: self.catalog = json.loads(catalog) logging.info("Loaded catalog from cache.") try: self.manager = json.loads(self._load_localfile("index.js")) except: self.manager = {}
[docs] def _load_catalog(self): """loading local index by reading from API endpoints""" self.catalog = {} self.catalog.setdefault( "collections", self._get_endpoint_info("data/collections/") ) self.catalog.setdefault("streams", self._get_endpoint_info("data/streams/")) self.catalog.setdefault("resources", self._get_endpoint_info("data/resources/")) with open(self.localfolder + "catalog.js", "w") as f: f.write(json.dumps(self.catalog))
[docs] def update_catalog(self): """Update cached info on data center entries from server.""" self._load_catalog() logging.info("Updated catalog!")
[docs] def _get_endpoint_info(self, endpoint_url): """get relevant info for catalog from api endpoint on server.""" jsondic = self._request(endpoint_url) fullinfo = {} while jsondic: for obj in jsondic["objects"]: objinfo = {} for entry in ("kid", "title", "description", "keywords"): objinfo.setdefault(entry, obj[entry]) if "relatedResources" in obj: objinfo.setdefault("relatedResources", obj["relatedResources"]) if "parentResource" in obj: objinfo.setdefault("parentResource", obj["parentResource"]) fullinfo.setdefault(obj["odcid"], objinfo) if jsondic["meta"]["next"]: addr = jsondic["meta"]["next"] jsondic = self._request( endpoint_url + addr[addr.rfind("/") + 1 : len(addr)] ) else: break return fullinfo
[docs] def _get_catalogentry_by_kid(self, kid, ischild=False): """finding entry in catalog by kid. If 'ischild' is True, the kid is matched to the entry 'parentResource'.""" outdict = {} for ent in ("collections", "streams", "resources"): for key in self.catalog[ent]: if not ischild: if self.catalog[ent][key]["kid"] == kid: return {key: self.catalog[ent][key]} else: if "parentResource" in self.catalog[ent][key]: if self.catalog[ent][key]["parentResource"] == kid: outdict.setdefault(key, self.catalog[ent][key]) if outdict: return outdict else: logging.info("Could not find entry for KID %s", kid) return {}
[docs] def list(self, identifier=""): """Print info on directory or object""" listdict = {} if not identifier: # list collections overview listdict = self.catalog["collections"] printdict = self._flatten_catalogentry(listdict) display( Markdown( "## Collections from the Open Data Center\n" + print_nice(printdict) ) ) elif identifier in self.catalog["collections"]: # list all parts of collection collection = self.catalog["collections"][identifier] info = "## " + collection["title"] + "\n" resources = {} for kid in from_json(collection["relatedResources"]): resource = self._get_catalogentry_by_kid(kid) if not resource is None: for k in resource: resources.setdefault(k, resource[k]) display(Markdown(info + print_nice(self._flatten_catalogentry(resources)))) elif identifier in self.catalog["streams"]: stream = self.catalog["streams"][identifier] info = "## " + stream["title"] + "\n" resources = self._get_catalogentry_by_kid(stream["kid"], ischild=True) display(Markdown(info + print_nice(self._flatten_catalogentry(resources)))) elif identifier in self.catalog["resources"]: # show a single resource fullrecord = self._request("data/resources/" + identifier + "/") pairlist = [] for key in fullrecord: if fullrecord[key]: pairlist.append({"key": key, "value": fullrecord[key]}) display(Markdown(print_nice(pairlist, {"Entry": "key", "Value": "value"}))) else: for dictype in ("collections", "streams", "resources"): if identifier in self.catalog[dictype]: listdict = self.catalog[dictype][identifier] if listdict: pairlist = [] for key in listdict: pairlist.append({"key": key, "value": listdict[key]}) display( Markdown(print_nice(pairlist, {"Entry": "key", "Value": "value"}))
)
[docs] def _request(self, address, download=False): """Gets response from URL as json object.""" if address[0] == "/": address[1 : len(address)] if "http" in address: url = address else: url = self.dataendpoint + address try: req = requests.get(url) except: logging.warning("Could not read from url %s", url) return None if download: return req if req.status_code == 200 and hasattr(req, "json"): try: return req.json() except: logging.warning("Problem to decode request %s", req.content) return None
[docs] def _load_localfile(self, filename): """Loading data from cache if already downloaded before.""" try: with open(self.localfolder + filename, "r") as f: return f.read() except: try: with open(self.localfolder + filename, "rb") as f: return f.read() except: logging.debug("Could not read local file %s.", filename) return None
[docs] def get(self, identifier, loadoption=None, reloadit=False): """Get data from server or local cache with identifier Loadoptions depend on the datatypes. """ ktype, data_local = False, False if identifier in self.manager and not reloadit: logging.info("Loading data from cache.") ktype = self.manager[identifier]["resource"]["ktype"] data_local = self.manager[identifier]["local"] resinfo = self.manager[identifier]["resource"] else: data_url = "" if identifier in self.catalog["resources"]: resinfo = self._request("data/resources/" + identifier + "/") if not resinfo is None: data_url = resinfo["accessURL"] ktype = resinfo["ktype"] if data_url: response = self._request(data_url, download=True) open(self.localfolder + resinfo["kid"], "wb").write(response.content) data_local = self.localfolder + resinfo["kid"] self.manager[identifier] = { "resource": resinfo, "local": self.localfolder + resinfo["kid"], } with open(self.localfolder + "index.js", "w") as f: f.write(json.dumps(self.manager, cls=BytesEncoder)) if not data_local or not ktype: logging.warning("Could not load data for identifier %s.", identifier) return None kobject = None if ktype == "km3.data.d4.table.lookup": kobject = LookUpTable.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d4.function.polynomial": kobject = Function.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d4.vo.scs": kobject = SCSServiceVO.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d3.optic.events.measurement": kobject = HDFTable.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d3.acoustic.psd": kobject = Table.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d3.acoustic.wav": kobject = AudioWave.from_resource(resinfo, data_local, loadoption) elif ktype == "km3.data.d3.acoustic.mp3": logging.info( "For analysis in python, using the wave files is recommended (see available streams -> wave for your audio sample)." ) elif ktype == "km3.data.d2.acoustic.raw": logging.info("Ktype %s not yet implemented", ktype) elif ktype == "km3.publication.paper": logging.info("Cannot load paper. Please visit %s .", resinfo["accessURL"]) elif ktype == "km3.meta.collection": logging.info( "Currently, loading a full collection is not supported. Please load resources individually." ) elif ktype == "km3.meta.stream": logging.info( "Currently, loading a full stream is not supported. Please load resources individually." ) else: logging.info("Could not handle ktype %s.", ktype) if not kobject is None: logging.info("Loaded entry %s as %s.", identifier, type(kobject)) return kobject else: logging.info("No data loaded for %s.", identifier) return None
[docs] def get_catalog(self): """returns the full catalog as dictionary.""" return self.catalog
@staticmethod
[docs] def _flatten_catalogentry(listdict): """Helper function to format catalog entry for printing""" outdicts = [] for entry in listdict: flatty = {"odcid": entry} for key in ("title", "description"): flatty.setdefault(key, listdict[entry][key]) outdicts.append(flatty) return outdicts