Source code for dataclasses

#!/usr/bin/python

import logging
import tables as tb
import json
import wave
import pyvo as vo
import pandas as pd
import numpy as np
from IPython.display import display, Markdown
import matplotlib.pyplot as plt


[docs]class KM3Object: """Class to hold data object from ODC""" def __init__(self): self.odcid = "" # identifier in the KM3Store self.location = "" # local storage self.meta = "" # self.data = None @classmethod
[docs] def from_resource(cls, resourceinfo, datalocation, loadoption=""): """Create class instance from resource info (metadata from ODC) and filepath to datafile in cache.""" newone = cls() newone.meta = resourceinfo newone._set_meta() newone.location = datalocation newone._read_data(loadoption) newone.odcid = resourceinfo["odcid"] return newone
[docs] def _set_meta(self): """Function template to set object attributes from resource metadata""" pass
[docs] def _read_data(self, loadoption=""): """Reads data from file in cache.""" if loadoption == "text": self.data = open(self.location, "r").read() else: self.data = open(self.location, "rb").read()
[docs] def show_metadata(self): pairlist = [] for key in self.meta: if key != "contentInfo" and self.meta[key]: pairlist.append({"key": key, "value": self.meta[key]}) display(Markdown(print_nice(pairlist, {"Entry": "key", "Value": "value"})))
[docs]class Table(KM3Object): """Table of values from json-formatted data""" def __init__(self): KM3Object.__init__(self)
[docs] def _set_meta(self): """Function template to set object attributes from resource metadata""" pass
[docs] def _read_data(self, loadoption=""): """Reads data from file in cache.""" self.data = from_json(open(self.location, "r").read())
[docs] def get_dataframe(self): """returns table as pandas dataframe.""" flattened = _contract_dict(self.data, inlist=True, tofloat=True) if not flattened is None: return pd.DataFrame(flattened) else: return pd.DataFrame(self.data)
[docs]class Function(KM3Object): """Class to generate numpy function from resource""" def __init__(self): KM3Object.__init__(self) self.functiontype = None self.paraminfo = {}
[docs] def _read_data(self, loadoption=""): """Reads json file""" self.data = open(self.location, "rb").read() if not self.data is None: self.data = from_json(self.data)
[docs] def _set_meta(self): """Setting parameter info from metadata""" if "contentInfo" in self.meta: if "parameters" in self.meta["contentInfo"]: contentinfo = from_json(self.meta["contentInfo"]) self.paraminfo = contentinfo["parameters"] ktype = self.meta["ktype"] self.functiontype = ktype[ktype.find("function.") + 9 : len(ktype)]
[docs] def show_paraminfo(self): """returns info on parameter ranges""" inlist = [] for entry in ("xvalue", "returnvalue"): if entry not in self.paraminfo: continue dic = {"param": entry} for par in ("name", "description", "unit", "symbol", "range"): if par in self.paraminfo[entry]: dic.setdefault(par, self.paraminfo[entry][par]) else: dic.setdefault(par, "") inlist.append(dic) printinfo = { "Parameter": "param", "Name": "name", "Description": "description", "Unit": "unit", "Symbol": "symbol", "Range": "range", } outstring = print_nice(inlist, selection=printinfo) display(Markdown(outstring))
[docs] def get_function(self): """Returns function as numpy object""" if self.functiontype == "polynomial": return np.polynomial.Polynomial(self.data) else: logging.warning("Did not find the function type %s.", self.functiontype) return None
[docs] def evaluate(self, xvalue): """Returns evaluated function for a given input value""" function = self.get_function() if not function is None: return function(xvalue) return None
[docs]class LookUpTable(KM3Object): """Table that can be queried for given x and y value.""" def __init__(self): KM3Object.__init__(self) self.paraminfo = {}
[docs] def _set_meta(self): """Setting parameter info and normalization from metadata""" if "contentInfo" in self.meta: if "parameters" in self.meta["contentInfo"]: contentinfo = from_json(self.meta["contentInfo"]) self.paraminfo = contentinfo["parameters"]
[docs] def _read_data(self, loadoption=""): """Reads json file""" self.data = open(self.location, "rb").read() if not self.data is None: self.data = from_json(self.data)
[docs] def show_paraminfo(self): """returns info on parameter ranges""" inlist = [] for entry in ("xaxis", "yaxis", "returnvalue"): if not entry in self.paraminfo: continue dic = {"param": entry} for par in ("name", "description", "unit", "symbol", "range"): if par in self.paraminfo[entry]: dic.setdefault(par, self.paraminfo[entry][par]) else: dic.setdefault(par, "") inlist.append(dic) printinfo = { "Parameter": "param", "Name": "name", "Description": "description", "Unit": "unit", "Symbol": "symbol", "Range": "range", } outstring = print_nice(inlist, selection=printinfo) display(Markdown(outstring))
[docs] def check_inrange(self, xvalue, yvalue): """Check if query values (xvalue, yvalue) in range""" if "range" in self.paraminfo["xaxis"]: x_range = from_json(self.paraminfo["xaxis"]["range"]) if len(x_range) == 2: if xvalue < x_range[0] or xvalue > x_range[1]: logging.warning( "x value %s out of range. Range is %s", xvalue, x_range ) return False if "range" in self.paraminfo["yaxis"]: y_range = from_json(self.paraminfo["yaxis"]["range"]) if len(y_range) == 2: if yvalue < y_range[0] or yvalue > y_range[1]: logging.warning( "y value %s out of range. Range is %s", yvalue, y_range ) return False return True
[docs] def get_dataframe(self): """returns table as pandas dataframe.""" norm = 1 if "normalization" in self.paraminfo: norm = float(self.paraminfo["normalization"]) return pd.DataFrame(self.data) * norm
[docs] def lookup(self, xvalue, yvalue): """Returns value for given x and y values from table""" if not self.check_inrange(xvalue, yvalue): return None norm = 1 if "normalization" in self.paraminfo: norm = float(self.paraminfo["normalization"]) xbin, ybin = False, False for xval in self.data: if float(xval) >= xvalue: xbin = xval break for yval in self.data[xval]: if float(yval) >= yvalue: ybin = yval break if xbin and ybin: return self.data[xbin][ybin] * norm else: logging.info( "Did not find correct bins for %s and %s. Got bins %s and %s.", xvalue, yvalue, xbin, ybin, ) return None
[docs]class HDFTable(KM3Object): def __init__(self): KM3Object.__init__(self) self.columns = None self.paraminfo = None self.provenance = None self.conditions = None
[docs] def _set_meta(self): """Function template to set object attributes from resource metadata""" self.columns = from_json(self.meta["accessInfo"])
[docs] def _read_data(self, loadoption=""): """Reading data as 'pandas' or 'table' (default).""" if not loadoption: loadoption = "table" if loadoption == "table" or self.paraminfo is None: try: f = tb.open_file(self.location) self.data = f.root[self.columns["tablename"]] self.header = f.root[self.columns["header"]] except: logging.warning("Could not read file %s.", self.meta["odcid"]) if self.paraminfo is None: paraminfo = {} for key in self.data._v_attrs._g_list_attr(self.data): for inf in ("NAME", "DESCRIPTION"): if key.find(inf) > -1: coln = key[key.find(inf) - 3 : key.find(inf) - 1] if coln in paraminfo: paraminfo[coln].setdefault(inf, self.data._v_attrs[key]) else: paraminfo.setdefault(coln, {inf: self.data._v_attrs[key]}) outparam = {} for num in paraminfo: if len(paraminfo[num]) == 2: outparam.setdefault( paraminfo[num]["NAME"], paraminfo[num]["DESCRIPTION"] ) self.paraminfo = outparam if self.provenance is None: if "provenance" in self.data._v_attrs: self.provenance = from_json(self.data._v_attrs["provenance"].decode()) conditions = {} if "datataking" in self.data._v_attrs: datataking = from_json(self.data._v_attrs["datataking"].decode()) conditions.setdefault("instrument", datataking) if "deselection_criteria" in self.data._v_attrs: selection = from_json(self.data._v_attrs["deselection_criteria"].decode()) conditions.setdefault("selection", selection) self.conditions = conditions if loadoption == "pandas": try: self.data = pd.read_hdf(self.location, self.columns["tablename"]) self.header = pd.read_hdf(self.location, self.columns["header"]) except: logging.warning("Could not read table %s", key) if self.data is None: logging.warning("Cannot handle loadoption %s.", loadoption)
[docs] def get_provenance(self, displayinfo = True): """Returns provenance information stored in the file""" return self.provenance
[docs] def get_dataframe(self): if type(self.data) != pd.core.frame.DataFrame: self._read_data(loadoption = "pandas") return self.data
[docs] def get_paraminfo(self, colname=""): """Shows information about parameters in table""" return self.paraminfo
#if not self.paraminfo is None: #contracted = _contract_dict(self.paraminfo) #paramlist = [] #for key in self.paraminfo: #thisdict = {"par":key} #for ent in self.paraminfo[key]: #print (self.paraminfo[key], ent, type(self.paraminfo[key])) #thisdict.setdefault(ent, self.paraminfo[key][ent]) #paramlist.append(thisdict) #printinfo = {"Parameter": "par", "Description": "description", "Type": "type"} #display(Markdown(print_nice(paramlist, selection=printinfo)))
[docs]class SCSServiceVO(KM3Object): """Class returning TAP service using the pyvo package. For further use of pyvo, see https://pyvo.readthedocs.io/en/latest/""" def __init__(self): KM3Object.__init__(self) self.accessInfo = ""
[docs] def _set_meta(self): """Function template to set object attributes from resource metadata""" if "accessInfo" in self.meta: self.accessInfo = from_json(self.meta["accessInfo"])
[docs] def _read_data(self, loadoption=""): """Reads data from file in cache.""" pass
[docs] def get_tap(self): """returns VO TAP service to use ADQL queries on data table""" return vo.dal.TAPService(self.accessInfo["tap"])
[docs] def get_scs(self): """returns Simple Cone Search Service""" return vo.dal.SCSService(self.accessInfo["scs"])
[docs] def get_dataframe(self): """returns astropy table with full data set.""" tap = self.get_tap() full_data = tap.search("SELECT * FROM " + self.accessInfo["table"]) return full_data.to_table().to_pandas()
[docs] def show_paraminfo(self): """returns info on parameters""" scs = self.get_scs() print("Table: " + self.accessInfo["table"]) scs.describe()
[docs]class AudioWave(KM3Object): """Class for handling wave audio files from KM3NeT hydrophones""" def __init__(self): KM3Object.__init__(self)
[docs] def _set_meta(self): """Function template to set object attributes from resource metadata""" pass
[docs] def _read_data(self, loadoption=""): """Reads data from file in cache.""" self.data = wave.open(self.location)
[docs] def show_paraminfo(self): """returns info on parameters""" params = [] parinfo = self.data.getparams() for par in parinfo._fields: params.append({"par": par, "val": getattr(parinfo, par)}) printinfo = {"Parameter": "par", "Value": "val"} display(Markdown(print_nice(params, selection=printinfo)))
[docs] def get_dataframe(self, endframe=1000, startframe=0): """Returns wave frames as pandas DataFrame, betweem startframe and endframe.""" wave = [] if endframe > self.data.getnframes(): endframe = self.data.getnframes() if startframe != 0: self.data.readframes(startframe) for ent in range(startframe, endframe): wave.append(np.frombuffer(self.data.readframes(1), np.int8)) return pd.DataFrame(wave)
[docs]class BytesEncoder(json.JSONEncoder): """Helper to handle hdf5 headers"""
[docs] def default(self, obj): if type(obj) is bytes: return obj.decode() return json.JSONEncoder.default(self, obj)
[docs]def from_json(instring): """Formatting the json string to make it useable from json.loads""" if type(instring) is bytes: instring = instring.decode() return json.loads(instring.replace("'", '"'))
[docs]def _contract_dict(indict, inlist=False, tofloat=False): """Reformat dictionary with entries of similar keys to unnested dictionary""" commonkeys = [] for key in indict: if not commonkeys: if inlist: for element in indict[key]: for key in element: commonkeys.append(key) else: commonkeys = indict[key] else: if inlist: for element in indict[key]: for key in element: if key not in commonkeys: logging.warning( "Cannot compress dictionary. Found unexpected key %s", key, ) return None else: for key in indict[key]: if key not in commonkeys: logging.warning( "Cannot compress dictionary. Found unexpected key %s", key ) return None outdict = {} for key in commonkeys: outdict.setdefault(key, []) for entry in indict: if inlist: for dic in indict[entry]: for key in dic: if tofloat: outdict[key].append(float(dic[key])) else: outdict[key].append(dic[key]) else: for key in commonkeys: if tofloat: outdict[key].append(float(indict[entry][key])) else: outdict[key].append(indict[entry][key]) return outdict