#!/usr/bin/python
import logging
import tables as tb
import json
import wave
import pyvo as vo
import pandas as pd
import numpy as np
from IPython.display import display, Markdown
import matplotlib.pyplot as plt
[docs]class KM3Object:
"""Class to hold data object from ODC"""
def __init__(self):
self.odcid = "" # identifier in the KM3Store
self.location = "" # local storage
self.meta = "" #
self.data = None
@classmethod
[docs] def from_resource(cls, resourceinfo, datalocation, loadoption=""):
"""Create class instance from resource info (metadata from ODC)
and filepath to datafile in cache."""
newone = cls()
newone.meta = resourceinfo
newone._set_meta()
newone.location = datalocation
newone._read_data(loadoption)
newone.odcid = resourceinfo["odcid"]
return newone
[docs] def _read_data(self, loadoption=""):
"""Reads data from file in cache."""
if loadoption == "text":
self.data = open(self.location, "r").read()
else:
self.data = open(self.location, "rb").read()
[docs]class Table(KM3Object):
"""Table of values from json-formatted data"""
def __init__(self):
KM3Object.__init__(self)
[docs] def _read_data(self, loadoption=""):
"""Reads data from file in cache."""
self.data = from_json(open(self.location, "r").read())
[docs] def get_dataframe(self):
"""returns table as pandas dataframe."""
flattened = _contract_dict(self.data, inlist=True, tofloat=True)
if not flattened is None:
return pd.DataFrame(flattened)
else:
return pd.DataFrame(self.data)
[docs]class Function(KM3Object):
"""Class to generate numpy function from resource"""
def __init__(self):
KM3Object.__init__(self)
self.functiontype = None
self.paraminfo = {}
[docs] def _read_data(self, loadoption=""):
"""Reads json file"""
self.data = open(self.location, "rb").read()
if not self.data is None:
self.data = from_json(self.data)
[docs] def show_paraminfo(self):
"""returns info on parameter ranges"""
inlist = []
for entry in ("xvalue", "returnvalue"):
if entry not in self.paraminfo:
continue
dic = {"param": entry}
for par in ("name", "description", "unit", "symbol", "range"):
if par in self.paraminfo[entry]:
dic.setdefault(par, self.paraminfo[entry][par])
else:
dic.setdefault(par, "")
inlist.append(dic)
printinfo = {
"Parameter": "param",
"Name": "name",
"Description": "description",
"Unit": "unit",
"Symbol": "symbol",
"Range": "range",
}
outstring = print_nice(inlist, selection=printinfo)
display(Markdown(outstring))
[docs] def get_function(self):
"""Returns function as numpy object"""
if self.functiontype == "polynomial":
return np.polynomial.Polynomial(self.data)
else:
logging.warning("Did not find the function type %s.", self.functiontype)
return None
[docs] def evaluate(self, xvalue):
"""Returns evaluated function for a given input value"""
function = self.get_function()
if not function is None:
return function(xvalue)
return None
[docs]class LookUpTable(KM3Object):
"""Table that can be queried for given x and y value."""
def __init__(self):
KM3Object.__init__(self)
self.paraminfo = {}
[docs] def _read_data(self, loadoption=""):
"""Reads json file"""
self.data = open(self.location, "rb").read()
if not self.data is None:
self.data = from_json(self.data)
[docs] def show_paraminfo(self):
"""returns info on parameter ranges"""
inlist = []
for entry in ("xaxis", "yaxis", "returnvalue"):
if not entry in self.paraminfo:
continue
dic = {"param": entry}
for par in ("name", "description", "unit", "symbol", "range"):
if par in self.paraminfo[entry]:
dic.setdefault(par, self.paraminfo[entry][par])
else:
dic.setdefault(par, "")
inlist.append(dic)
printinfo = {
"Parameter": "param",
"Name": "name",
"Description": "description",
"Unit": "unit",
"Symbol": "symbol",
"Range": "range",
}
outstring = print_nice(inlist, selection=printinfo)
display(Markdown(outstring))
[docs] def check_inrange(self, xvalue, yvalue):
"""Check if query values (xvalue, yvalue) in range"""
if "range" in self.paraminfo["xaxis"]:
x_range = from_json(self.paraminfo["xaxis"]["range"])
if len(x_range) == 2:
if xvalue < x_range[0] or xvalue > x_range[1]:
logging.warning(
"x value %s out of range. Range is %s", xvalue, x_range
)
return False
if "range" in self.paraminfo["yaxis"]:
y_range = from_json(self.paraminfo["yaxis"]["range"])
if len(y_range) == 2:
if yvalue < y_range[0] or yvalue > y_range[1]:
logging.warning(
"y value %s out of range. Range is %s", yvalue, y_range
)
return False
return True
[docs] def get_dataframe(self):
"""returns table as pandas dataframe."""
norm = 1
if "normalization" in self.paraminfo:
norm = float(self.paraminfo["normalization"])
return pd.DataFrame(self.data) * norm
[docs] def lookup(self, xvalue, yvalue):
"""Returns value for given x and y values from table"""
if not self.check_inrange(xvalue, yvalue):
return None
norm = 1
if "normalization" in self.paraminfo:
norm = float(self.paraminfo["normalization"])
xbin, ybin = False, False
for xval in self.data:
if float(xval) >= xvalue:
xbin = xval
break
for yval in self.data[xval]:
if float(yval) >= yvalue:
ybin = yval
break
if xbin and ybin:
return self.data[xbin][ybin] * norm
else:
logging.info(
"Did not find correct bins for %s and %s. Got bins %s and %s.",
xvalue,
yvalue,
xbin,
ybin,
)
return None
[docs]class HDFTable(KM3Object):
def __init__(self):
KM3Object.__init__(self)
self.columns = None
self.paraminfo = None
self.provenance = None
self.conditions = None
[docs] def _read_data(self, loadoption=""):
"""Reading data as 'pandas' or 'table' (default)."""
if not loadoption:
loadoption = "table"
if loadoption == "table" or self.paraminfo is None:
try:
f = tb.open_file(self.location)
self.data = f.root[self.columns["tablename"]]
self.header = f.root[self.columns["header"]]
except:
logging.warning("Could not read file %s.", self.meta["odcid"])
if self.paraminfo is None:
paraminfo = {}
for key in self.data._v_attrs._g_list_attr(self.data):
for inf in ("NAME", "DESCRIPTION"):
if key.find(inf) > -1:
coln = key[key.find(inf) - 3 : key.find(inf) - 1]
if coln in paraminfo:
paraminfo[coln].setdefault(inf, self.data._v_attrs[key])
else:
paraminfo.setdefault(coln, {inf: self.data._v_attrs[key]})
outparam = {}
for num in paraminfo:
if len(paraminfo[num]) == 2:
outparam.setdefault(
paraminfo[num]["NAME"], paraminfo[num]["DESCRIPTION"]
)
self.paraminfo = outparam
if self.provenance is None:
if "provenance" in self.data._v_attrs:
self.provenance = from_json(self.data._v_attrs["provenance"].decode())
conditions = {}
if "datataking" in self.data._v_attrs:
datataking = from_json(self.data._v_attrs["datataking"].decode())
conditions.setdefault("instrument", datataking)
if "deselection_criteria" in self.data._v_attrs:
selection = from_json(self.data._v_attrs["deselection_criteria"].decode())
conditions.setdefault("selection", selection)
self.conditions = conditions
if loadoption == "pandas":
try:
self.data = pd.read_hdf(self.location, self.columns["tablename"])
self.header = pd.read_hdf(self.location, self.columns["header"])
except:
logging.warning("Could not read table %s", key)
if self.data is None:
logging.warning("Cannot handle loadoption %s.", loadoption)
[docs] def get_provenance(self, displayinfo = True):
"""Returns provenance information stored in the file"""
return self.provenance
[docs] def get_dataframe(self):
if type(self.data) != pd.core.frame.DataFrame:
self._read_data(loadoption = "pandas")
return self.data
[docs] def get_paraminfo(self, colname=""):
"""Shows information about parameters in table"""
return self.paraminfo
#if not self.paraminfo is None:
#contracted = _contract_dict(self.paraminfo)
#paramlist = []
#for key in self.paraminfo:
#thisdict = {"par":key}
#for ent in self.paraminfo[key]:
#print (self.paraminfo[key], ent, type(self.paraminfo[key]))
#thisdict.setdefault(ent, self.paraminfo[key][ent])
#paramlist.append(thisdict)
#printinfo = {"Parameter": "par", "Description": "description", "Type": "type"}
#display(Markdown(print_nice(paramlist, selection=printinfo)))
[docs]class SCSServiceVO(KM3Object):
"""Class returning TAP service using the pyvo package.
For further use of pyvo, see https://pyvo.readthedocs.io/en/latest/"""
def __init__(self):
KM3Object.__init__(self)
self.accessInfo = ""
[docs] def _read_data(self, loadoption=""):
"""Reads data from file in cache."""
pass
[docs] def get_tap(self):
"""returns VO TAP service to use ADQL queries on data table"""
return vo.dal.TAPService(self.accessInfo["tap"])
[docs] def get_scs(self):
"""returns Simple Cone Search Service"""
return vo.dal.SCSService(self.accessInfo["scs"])
[docs] def get_dataframe(self):
"""returns astropy table with full data set."""
tap = self.get_tap()
full_data = tap.search("SELECT * FROM " + self.accessInfo["table"])
return full_data.to_table().to_pandas()
[docs] def show_paraminfo(self):
"""returns info on parameters"""
scs = self.get_scs()
print("Table: " + self.accessInfo["table"])
scs.describe()
[docs]class AudioWave(KM3Object):
"""Class for handling wave audio files from KM3NeT hydrophones"""
def __init__(self):
KM3Object.__init__(self)
[docs] def _read_data(self, loadoption=""):
"""Reads data from file in cache."""
self.data = wave.open(self.location)
[docs] def show_paraminfo(self):
"""returns info on parameters"""
params = []
parinfo = self.data.getparams()
for par in parinfo._fields:
params.append({"par": par, "val": getattr(parinfo, par)})
printinfo = {"Parameter": "par", "Value": "val"}
display(Markdown(print_nice(params, selection=printinfo)))
[docs] def get_dataframe(self, endframe=1000, startframe=0):
"""Returns wave frames as pandas DataFrame, betweem startframe and endframe."""
wave = []
if endframe > self.data.getnframes():
endframe = self.data.getnframes()
if startframe != 0:
self.data.readframes(startframe)
for ent in range(startframe, endframe):
wave.append(np.frombuffer(self.data.readframes(1), np.int8))
return pd.DataFrame(wave)
[docs]def print_nice(
inlist,
selection={"Identifier": "odcid", "Title": "title", "Description": "description"},
):
"""function to display contents of list of dictionaries as table.
Option 'selection' takes dictionary with pairs of (columnname, key)."""
outstring = "| "
underline = "| "
for column in selection:
outstring += column + " | "
underline += len(column) * "-" + " | "
outstring += "\n" + underline + "\n"
for entry in inlist:
newline = "| "
for column in selection:
if selection[column] in entry:
newline += str(entry[selection[column]]) + " | "
outstring += newline + "\n"
return outstring
[docs]class BytesEncoder(json.JSONEncoder):
"""Helper to handle hdf5 headers"""
[docs] def default(self, obj):
if type(obj) is bytes:
return obj.decode()
return json.JSONEncoder.default(self, obj)
[docs]def from_json(instring):
"""Formatting the json string to make it useable from json.loads"""
if type(instring) is bytes:
instring = instring.decode()
return json.loads(instring.replace("'", '"'))
[docs]def _contract_dict(indict, inlist=False, tofloat=False):
"""Reformat dictionary with entries of similar keys to unnested dictionary"""
commonkeys = []
for key in indict:
if not commonkeys:
if inlist:
for element in indict[key]:
for key in element:
commonkeys.append(key)
else:
commonkeys = indict[key]
else:
if inlist:
for element in indict[key]:
for key in element:
if key not in commonkeys:
logging.warning(
"Cannot compress dictionary. Found unexpected key %s",
key,
)
return None
else:
for key in indict[key]:
if key not in commonkeys:
logging.warning(
"Cannot compress dictionary. Found unexpected key %s", key
)
return None
outdict = {}
for key in commonkeys:
outdict.setdefault(key, [])
for entry in indict:
if inlist:
for dic in indict[entry]:
for key in dic:
if tofloat:
outdict[key].append(float(dic[key]))
else:
outdict[key].append(dic[key])
else:
for key in commonkeys:
if tofloat:
outdict[key].append(float(indict[entry][key]))
else:
outdict[key].append(indict[entry][key])
return outdict