#!/usr/bin/python
import os
import logging
import json
import requests
from openkm3.dataclasses import *
from IPython.display import display, Markdown
logging.basicConfig(level=logging.INFO)
[docs]class KM3Store:
"""Class to interface to the KM3NeT Open Data Center"""
def __init__(self, localfolder=".openkm3/"):
self.localfolder = localfolder
self.dataendpoint = "http://opendata.km3net.de/"
self.catalog = {}
self.manager = {}
if not os.path.exists(self.localfolder):
os.makedirs(self.localfolder)
self._load_catalog()
logging.info("Loaded catalog from data center.")
else:
catalog = self._load_localfile("catalog.js")
if catalog is None:
self._load_catalog()
logging.info("Loaded catalog from data center.")
else:
self.catalog = json.loads(catalog)
logging.info("Loaded catalog from cache.")
try:
self.manager = json.loads(self._load_localfile("index.js"))
except:
self.manager = {}
[docs] def _load_catalog(self):
"""loading local index by reading from API endpoints"""
self.catalog = {}
self.catalog.setdefault(
"collections", self._get_endpoint_info("data/collections/")
)
self.catalog.setdefault("streams", self._get_endpoint_info("data/streams/"))
self.catalog.setdefault("resources", self._get_endpoint_info("data/resources/"))
with open(self.localfolder + "catalog.js", "w") as f:
f.write(json.dumps(self.catalog))
[docs] def update_catalog(self):
"""Update cached info on data center entries from server."""
self._load_catalog()
logging.info("Updated catalog!")
[docs] def _get_endpoint_info(self, endpoint_url):
"""get relevant info for catalog from api endpoint on server."""
jsondic = self._request(endpoint_url)
fullinfo = {}
while jsondic:
for obj in jsondic["objects"]:
objinfo = {}
for entry in ("kid", "title", "description", "keywords"):
objinfo.setdefault(entry, obj[entry])
if "relatedResources" in obj:
objinfo.setdefault("relatedResources", obj["relatedResources"])
if "parentResource" in obj:
objinfo.setdefault("parentResource", obj["parentResource"])
fullinfo.setdefault(obj["odcid"], objinfo)
if jsondic["meta"]["next"]:
addr = jsondic["meta"]["next"]
jsondic = self._request(
endpoint_url + addr[addr.rfind("/") + 1 : len(addr)]
)
else:
break
return fullinfo
[docs] def _get_catalogentry_by_kid(self, kid, ischild=False):
"""finding entry in catalog by kid. If 'ischild' is True, the kid is matched to the entry 'parentResource'."""
outdict = {}
for ent in ("collections", "streams", "resources"):
for key in self.catalog[ent]:
if not ischild:
if self.catalog[ent][key]["kid"] == kid:
return {key: self.catalog[ent][key]}
else:
if "parentResource" in self.catalog[ent][key]:
if self.catalog[ent][key]["parentResource"] == kid:
outdict.setdefault(key, self.catalog[ent][key])
if outdict:
return outdict
else:
logging.info("Could not find entry for KID %s", kid)
return {}
[docs] def list(self, identifier=""):
"""Print info on directory or object"""
listdict = {}
if not identifier: # list collections overview
listdict = self.catalog["collections"]
printdict = self._flatten_catalogentry(listdict)
display(
Markdown(
"## Collections from the Open Data Center\n" + print_nice(printdict)
)
)
elif identifier in self.catalog["collections"]: # list all parts of collection
collection = self.catalog["collections"][identifier]
info = "## " + collection["title"] + "\n"
resources = {}
for kid in from_json(collection["relatedResources"]):
resource = self._get_catalogentry_by_kid(kid)
if not resource is None:
for k in resource:
resources.setdefault(k, resource[k])
display(Markdown(info + print_nice(self._flatten_catalogentry(resources))))
elif identifier in self.catalog["streams"]:
stream = self.catalog["streams"][identifier]
info = "## " + stream["title"] + "\n"
resources = self._get_catalogentry_by_kid(stream["kid"], ischild=True)
display(Markdown(info + print_nice(self._flatten_catalogentry(resources))))
elif identifier in self.catalog["resources"]: # show a single resource
fullrecord = self._request("data/resources/" + identifier + "/")
pairlist = []
for key in fullrecord:
if fullrecord[key]:
pairlist.append({"key": key, "value": fullrecord[key]})
display(Markdown(print_nice(pairlist, {"Entry": "key", "Value": "value"})))
else:
for dictype in ("collections", "streams", "resources"):
if identifier in self.catalog[dictype]:
listdict = self.catalog[dictype][identifier]
if listdict:
pairlist = []
for key in listdict:
pairlist.append({"key": key, "value": listdict[key]})
display(
Markdown(print_nice(pairlist, {"Entry": "key", "Value": "value"}))
)
[docs] def _request(self, address, download=False):
"""Gets response from URL as json object."""
if address[0] == "/":
address[1 : len(address)]
if "http" in address:
url = address
else:
url = self.dataendpoint + address
try:
req = requests.get(url)
except:
logging.warning("Could not read from url %s", url)
return None
if download:
return req
if req.status_code == 200 and hasattr(req, "json"):
try:
return req.json()
except:
logging.warning("Problem to decode request %s", req.content)
return None
[docs] def _load_localfile(self, filename):
"""Loading data from cache if already downloaded before."""
try:
with open(self.localfolder + filename, "r") as f:
return f.read()
except:
try:
with open(self.localfolder + filename, "rb") as f:
return f.read()
except:
logging.debug("Could not read local file %s.", filename)
return None
[docs] def get(self, identifier, loadoption=None, reloadit=False):
"""Get data from server or local cache with identifier
Loadoptions depend on the datatypes.
"""
ktype, data_local = False, False
if identifier in self.manager and not reloadit:
logging.info("Loading data from cache.")
ktype = self.manager[identifier]["resource"]["ktype"]
data_local = self.manager[identifier]["local"]
resinfo = self.manager[identifier]["resource"]
else:
data_url = ""
if identifier in self.catalog["resources"]:
resinfo = self._request("data/resources/" + identifier + "/")
if not resinfo is None:
data_url = resinfo["accessURL"]
ktype = resinfo["ktype"]
if data_url:
response = self._request(data_url, download=True)
open(self.localfolder + resinfo["kid"], "wb").write(response.content)
data_local = self.localfolder + resinfo["kid"]
self.manager[identifier] = {
"resource": resinfo,
"local": self.localfolder + resinfo["kid"],
}
with open(self.localfolder + "index.js", "w") as f:
f.write(json.dumps(self.manager, cls=BytesEncoder))
if not data_local or not ktype:
logging.warning("Could not load data for identifier %s.", identifier)
return None
kobject = None
if ktype == "km3.data.d4.table.lookup":
kobject = LookUpTable.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d4.function.polynomial":
kobject = Function.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d4.vo.scs":
kobject = SCSServiceVO.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d3.optic.events.measurement":
kobject = HDFTable.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d3.acoustic.psd":
kobject = Table.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d3.acoustic.wav":
kobject = AudioWave.from_resource(resinfo, data_local, loadoption)
elif ktype == "km3.data.d3.acoustic.mp3":
logging.info(
"For analysis in python, using the wave files is recommended (see available streams -> wave for your audio sample)."
)
elif ktype == "km3.data.d2.acoustic.raw":
logging.info("Ktype %s not yet implemented", ktype)
elif ktype == "km3.publication.paper":
logging.info("Cannot load paper. Please visit %s .", resinfo["accessURL"])
elif ktype == "km3.meta.collection":
logging.info(
"Currently, loading a full collection is not supported. Please load resources individually."
)
elif ktype == "km3.meta.stream":
logging.info(
"Currently, loading a full stream is not supported. Please load resources individually."
)
else:
logging.info("Could not handle ktype %s.", ktype)
if not kobject is None:
logging.info("Loaded entry %s as %s.", identifier, type(kobject))
return kobject
else:
logging.info("No data loaded for %s.", identifier)
return None
[docs] def get_catalog(self):
"""returns the full catalog as dictionary."""
return self.catalog
@staticmethod
[docs] def _flatten_catalogentry(listdict):
"""Helper function to format catalog entry for printing"""
outdicts = []
for entry in listdict:
flatty = {"odcid": entry}
for key in ("title", "description"):
flatty.setdefault(key, listdict[entry][key])
outdicts.append(flatty)
return outdicts