hat.syslog.handler

Syslog hadler

Implementation of logging.Handler for syslog logging protocol.

  1"""Syslog hadler
  2
  3Implementation of `logging.Handler` for syslog logging protocol.
  4
  5"""
  6
  7import collections
  8import contextlib
  9import datetime
 10import logging
 11import os
 12import socket
 13import ssl
 14import sys
 15import threading
 16import time
 17import traceback
 18import typing
 19
 20from hat import json
 21from hat.syslog import common
 22from hat.syslog import encoder
 23
 24
 25class SysLogHandler(logging.Handler):
 26    """Syslog handler
 27
 28    Args:
 29        host: remote host name
 30        port: remote TCP/UDP port
 31        comm_type: communication type
 32        queue_size: message queue size
 33        reconnect_delay: delay in seconds before retrying connection with
 34            remote syslog server
 35
 36    """
 37
 38    def __init__(self,
 39                 host: str,
 40                 port: int,
 41                 comm_type: typing.Union[common.CommType, str],
 42                 queue_size: int = 1024,
 43                 reconnect_delay: float = 5):
 44        super().__init__()
 45        self.__state = _ThreadState(
 46            host=host,
 47            port=port,
 48            comm_type=(common.CommType[comm_type]
 49                       if not isinstance(comm_type, common.CommType)
 50                       else comm_type),
 51            queue=collections.deque(),
 52            queue_size=queue_size,
 53            reconnect_delay=reconnect_delay,
 54            cv=threading.Condition(),
 55            closed=threading.Event(),
 56            dropped=[0])
 57        self.__thread = threading.Thread(
 58            target=_logging_handler_thread,
 59            args=(self.__state, ),
 60            daemon=True)
 61        self.__thread.start()
 62
 63    def close(self):
 64        """"See `logging.Handler.close`"""
 65        state = self.__state
 66        with state.cv:
 67            if state.closed.is_set():
 68                return
 69            state.closed.set()
 70            with contextlib.suppress(Exception):
 71                # workaround for errors/0001.txt
 72                state.cv.notify_all()
 73
 74    def emit(self, record):
 75        """"See `logging.Handler.emit`"""
 76        msg = _record_to_msg(record)
 77        state = self.__state
 78        with state.cv:
 79            if state.closed.is_set():
 80                return
 81            state.queue.append(msg)
 82            while len(state.queue) > state.queue_size:
 83                state.queue.popleft()
 84                state.dropped[0] += 1
 85            with contextlib.suppress(Exception):
 86                # workaround for errors/0001.txt
 87                state.cv.notify_all()
 88
 89
 90class _ThreadState(typing.NamedTuple):
 91    """Handler thread state"""
 92    host: str
 93    """Hostname"""
 94    port: int
 95    """TCP port"""
 96    comm_type: typing.Union[common.CommType, str]
 97    """Communication type"""
 98    queue: collections.deque
 99    """Message queue"""
100    queue_size: int
101    """Message queue size"""
102    reconnect_delay: float
103    """Reconnect delay"""
104    cv: threading.Condition
105    """Conditional variable"""
106    closed: threading.Event
107    """Closed flag"""
108    dropped: typing.List[int]
109    """Dropped message counter"""
110
111
112def _logging_handler_thread(state):
113    msg = None
114    while not state.closed.is_set():
115        try:
116            if state.comm_type == common.CommType.UDP:
117                s = socket.socket(type=socket.SOCK_DGRAM)
118                s.connect((state.host, state.port))
119            elif state.comm_type == common.CommType.TCP:
120                s = socket.create_connection((state.host, state.port))
121                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
122            elif state.comm_type == common.CommType.SSL:
123                s = ssl.wrap_socket(socket.create_connection(
124                    (state.host, state.port)))
125                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
126            else:
127                raise NotImplementedError()
128        except Exception:
129            time.sleep(state.reconnect_delay)
130            continue
131        try:
132            while True:
133                if not msg:
134                    with state.cv:
135                        state.cv.wait_for(lambda: (state.closed.is_set() or
136                                                   len(state.queue) or
137                                                   state.dropped[0]))
138                        if state.closed.is_set():
139                            return
140                        if state.dropped[0]:
141                            msg = _create_dropped_msg(
142                                state.dropped[0], '_logging_handler_thread', 0)
143                            state.dropped[0] = 0
144                        else:
145                            msg = state.queue.popleft()
146                msg_bytes = encoder.msg_to_str(msg).encode()
147                s.send(f'{len(msg_bytes)} '.encode() + msg_bytes)
148                msg = None
149        except Exception:
150            pass
151        finally:
152            with contextlib.suppress(Exception):
153                s.close()
154
155
156def _record_to_msg(record):
157    exc_info = ''
158    with contextlib.suppress(Exception):
159        if record.exc_info:
160            exc_info = ''.join(
161                traceback.TracebackException(*record.exc_info).format())
162    return common.Msg(
163        facility=common.Facility.USER,
164        severity={
165            logging.NOTSET: common.Severity.INFORMATIONAL,
166            logging.DEBUG: common.Severity.DEBUG,
167            logging.INFO: common.Severity.INFORMATIONAL,
168            logging.WARNING: common.Severity.WARNING,
169            logging.ERROR: common.Severity.ERROR,
170            logging.CRITICAL: common.Severity.CRITICAL}[record.levelno],
171        version=1,
172        timestamp=record.created,
173        hostname=socket.gethostname(),
174        app_name=sys.argv[0],  # record.processName
175        procid=str(record.process) if record.process else None,
176        msgid=record.name[:32],
177        data=json.encode({
178            'hat@1': {
179                'name': str(record.name),
180                'thread': str(record.thread),
181                'funcName': str(record.funcName),
182                'lineno': str(record.lineno),
183                'exc_info': exc_info}}),
184        msg=record.getMessage())
185
186
187def _create_dropped_msg(dropped, func_name, lineno):
188    return common.Msg(
189        facility=common.Facility.USER,
190        severity=common.Severity.ERROR,
191        version=1,
192        timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
193        hostname=socket.gethostname(),
194        app_name=sys.argv[0],  # record.processName
195        procid=str(os.getpid()),
196        msgid=__name__,
197        data=json.encode({
198            'hat@1': {
199                'name': __name__,
200                'thread': str(threading.get_ident()),
201                'funcName': str(func_name),
202                'lineno': str(lineno),
203                'exc_info': ''}}),
204        msg=f'dropped {dropped} log messages')
class SysLogHandler(logging.Handler):
26class SysLogHandler(logging.Handler):
27    """Syslog handler
28
29    Args:
30        host: remote host name
31        port: remote TCP/UDP port
32        comm_type: communication type
33        queue_size: message queue size
34        reconnect_delay: delay in seconds before retrying connection with
35            remote syslog server
36
37    """
38
39    def __init__(self,
40                 host: str,
41                 port: int,
42                 comm_type: typing.Union[common.CommType, str],
43                 queue_size: int = 1024,
44                 reconnect_delay: float = 5):
45        super().__init__()
46        self.__state = _ThreadState(
47            host=host,
48            port=port,
49            comm_type=(common.CommType[comm_type]
50                       if not isinstance(comm_type, common.CommType)
51                       else comm_type),
52            queue=collections.deque(),
53            queue_size=queue_size,
54            reconnect_delay=reconnect_delay,
55            cv=threading.Condition(),
56            closed=threading.Event(),
57            dropped=[0])
58        self.__thread = threading.Thread(
59            target=_logging_handler_thread,
60            args=(self.__state, ),
61            daemon=True)
62        self.__thread.start()
63
64    def close(self):
65        """"See `logging.Handler.close`"""
66        state = self.__state
67        with state.cv:
68            if state.closed.is_set():
69                return
70            state.closed.set()
71            with contextlib.suppress(Exception):
72                # workaround for errors/0001.txt
73                state.cv.notify_all()
74
75    def emit(self, record):
76        """"See `logging.Handler.emit`"""
77        msg = _record_to_msg(record)
78        state = self.__state
79        with state.cv:
80            if state.closed.is_set():
81                return
82            state.queue.append(msg)
83            while len(state.queue) > state.queue_size:
84                state.queue.popleft()
85                state.dropped[0] += 1
86            with contextlib.suppress(Exception):
87                # workaround for errors/0001.txt
88                state.cv.notify_all()

Syslog handler

Args
  • host: remote host name
  • port: remote TCP/UDP port
  • comm_type: communication type
  • queue_size: message queue size
  • reconnect_delay: delay in seconds before retrying connection with remote syslog server
SysLogHandler( host: str, port: int, comm_type: Union[hat.syslog.common.CommType, str], queue_size: int = 1024, reconnect_delay: float = 5)
39    def __init__(self,
40                 host: str,
41                 port: int,
42                 comm_type: typing.Union[common.CommType, str],
43                 queue_size: int = 1024,
44                 reconnect_delay: float = 5):
45        super().__init__()
46        self.__state = _ThreadState(
47            host=host,
48            port=port,
49            comm_type=(common.CommType[comm_type]
50                       if not isinstance(comm_type, common.CommType)
51                       else comm_type),
52            queue=collections.deque(),
53            queue_size=queue_size,
54            reconnect_delay=reconnect_delay,
55            cv=threading.Condition(),
56            closed=threading.Event(),
57            dropped=[0])
58        self.__thread = threading.Thread(
59            target=_logging_handler_thread,
60            args=(self.__state, ),
61            daemon=True)
62        self.__thread.start()

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def close(self)
64    def close(self):
65        """"See `logging.Handler.close`"""
66        state = self.__state
67        with state.cv:
68            if state.closed.is_set():
69                return
70            state.closed.set()
71            with contextlib.suppress(Exception):
72                # workaround for errors/0001.txt
73                state.cv.notify_all()

"See logging.Handler.close

def emit(self, record)
75    def emit(self, record):
76        """"See `logging.Handler.emit`"""
77        msg = _record_to_msg(record)
78        state = self.__state
79        with state.cv:
80            if state.closed.is_set():
81                return
82            state.queue.append(msg)
83            while len(state.queue) > state.queue_size:
84                state.queue.popleft()
85                state.dropped[0] += 1
86            with contextlib.suppress(Exception):
87                # workaround for errors/0001.txt
88                state.cv.notify_all()

"See logging.Handler.emit

Inherited Members
logging.Handler
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
handleError
logging.Filterer
addFilter
removeFilter
filter