hat.syslog.server.encoder

Data structures encoder/decoder

 1"""Data structures encoder/decoder"""
 2
 3from hat.syslog.encoder import *  # NOQA
 4
 5from hat import json
 6
 7from hat.syslog.encoder import (msg_from_json,
 8                                msg_to_json)
 9from hat.syslog.server import common
10
11
12def filter_to_json(filter: common.Filter) -> json.Data:
13    """Convert filter to json data"""
14    return dict(filter._asdict(),
15                facility=filter.facility.name if filter.facility else None,
16                severity=filter.severity.name if filter.severity else None)
17
18
19def filter_from_json(json_filter: json.Data) -> common.Filter:
20    """Create filter from json data"""
21    return common.Filter(**dict(
22        json_filter,
23        facility=(common.Facility[json_filter['facility']]
24                  if json_filter['facility'] else None),
25        severity=(common.Severity[json_filter['severity']]
26                  if json_filter['severity'] else None)))
27
28
29def entry_to_json(entry: common.Entry) -> json.Data:
30    """Convert entry to json data"""
31    return dict(entry._asdict(),
32                msg=msg_to_json(entry.msg))
33
34
35def entry_from_json(json_entry: json.Data) -> common.Entry:
36    """Create entry from json data"""
37    return common.Entry(**dict(
38        json_entry,
39        msg=msg_from_json(json_entry['msg'])))
def filter_to_json( filter: hat.syslog.server.common.Filter) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
13def filter_to_json(filter: common.Filter) -> json.Data:
14    """Convert filter to json data"""
15    return dict(filter._asdict(),
16                facility=filter.facility.name if filter.facility else None,
17                severity=filter.severity.name if filter.severity else None)

Convert filter to json data

def filter_from_json( json_filter: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.syslog.server.common.Filter:
20def filter_from_json(json_filter: json.Data) -> common.Filter:
21    """Create filter from json data"""
22    return common.Filter(**dict(
23        json_filter,
24        facility=(common.Facility[json_filter['facility']]
25                  if json_filter['facility'] else None),
26        severity=(common.Severity[json_filter['severity']]
27                  if json_filter['severity'] else None)))

Create filter from json data

def entry_to_json( entry: hat.syslog.server.common.Entry) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
30def entry_to_json(entry: common.Entry) -> json.Data:
31    """Convert entry to json data"""
32    return dict(entry._asdict(),
33                msg=msg_to_json(entry.msg))

Convert entry to json data

def entry_from_json( json_entry: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.syslog.server.common.Entry:
36def entry_from_json(json_entry: json.Data) -> common.Entry:
37    """Create entry from json data"""
38    return common.Entry(**dict(
39        json_entry,
40        msg=msg_from_json(json_entry['msg'])))

Create entry from json data