hat.syslog.server.encoder
Data structures encoder/decoder
1"""Data structures encoder/decoder""" 2 3from hat.syslog.encoder import * # NOQA 4 5from hat import json 6 7from hat.syslog.encoder import (msg_from_json, 8 msg_to_json) 9from hat.syslog.server import common 10 11 12def filter_to_json(filter: common.Filter) -> json.Data: 13 """Convert filter to json data""" 14 return dict(filter._asdict(), 15 facility=filter.facility.name if filter.facility else None, 16 severity=filter.severity.name if filter.severity else None) 17 18 19def filter_from_json(json_filter: json.Data) -> common.Filter: 20 """Create filter from json data""" 21 return common.Filter(**dict( 22 json_filter, 23 facility=(common.Facility[json_filter['facility']] 24 if json_filter['facility'] else None), 25 severity=(common.Severity[json_filter['severity']] 26 if json_filter['severity'] else None))) 27 28 29def entry_to_json(entry: common.Entry) -> json.Data: 30 """Convert entry to json data""" 31 return dict(entry._asdict(), 32 msg=msg_to_json(entry.msg)) 33 34 35def entry_from_json(json_entry: json.Data) -> common.Entry: 36 """Create entry from json data""" 37 return common.Entry(**dict( 38 json_entry, 39 msg=msg_from_json(json_entry['msg'])))
def
filter_to_json( filter: hat.syslog.server.common.Filter) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
13def filter_to_json(filter: common.Filter) -> json.Data: 14 """Convert filter to json data""" 15 return dict(filter._asdict(), 16 facility=filter.facility.name if filter.facility else None, 17 severity=filter.severity.name if filter.severity else None)
Convert filter to json data
def
filter_from_json( json_filter: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.syslog.server.common.Filter:
20def filter_from_json(json_filter: json.Data) -> common.Filter: 21 """Create filter from json data""" 22 return common.Filter(**dict( 23 json_filter, 24 facility=(common.Facility[json_filter['facility']] 25 if json_filter['facility'] else None), 26 severity=(common.Severity[json_filter['severity']] 27 if json_filter['severity'] else None)))
Create filter from json data
def
entry_to_json( entry: hat.syslog.server.common.Entry) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
30def entry_to_json(entry: common.Entry) -> json.Data: 31 """Convert entry to json data""" 32 return dict(entry._asdict(), 33 msg=msg_to_json(entry.msg))
Convert entry to json data
def
entry_from_json( json_entry: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.syslog.server.common.Entry:
36def entry_from_json(json_entry: json.Data) -> common.Entry: 37 """Create entry from json data""" 38 return common.Entry(**dict( 39 json_entry, 40 msg=msg_from_json(json_entry['msg'])))
Create entry from json data