hat.syslog.handler

Syslog handler

Implementation of logging.Handler for syslog logging protocol.

  1"""Syslog handler
  2
  3Implementation of `logging.Handler` for syslog logging protocol.
  4
  5"""
  6
  7import collections
  8import contextlib
  9import datetime
 10import logging
 11import os
 12import socket
 13import ssl
 14import sys
 15import threading
 16import time
 17import traceback
 18import typing
 19
 20from hat import json
 21
 22from hat.syslog import common
 23from hat.syslog import encoder
 24
 25
 26class SyslogHandler(logging.Handler):
 27    """Syslog handler
 28
 29    Args:
 30        host: remote host name
 31        port: remote TCP/UDP port
 32        comm_type: communication type
 33        queue_size: message queue size
 34        reconnect_delay: delay in seconds before retrying connection with
 35            remote syslog server
 36
 37    """
 38
 39    def __init__(self,
 40                 host: str,
 41                 port: int,
 42                 comm_type: common.CommType | str,
 43                 queue_size: int = 1024,
 44                 reconnect_delay: float = 5):
 45        super().__init__()
 46
 47        self.__state = _ThreadState(
 48            host=host,
 49            port=port,
 50            comm_type=(common.CommType[comm_type]
 51                       if not isinstance(comm_type, common.CommType)
 52                       else comm_type),
 53            queue=collections.deque(),
 54            queue_size=queue_size,
 55            reconnect_delay=reconnect_delay,
 56            cv=threading.Condition(),
 57            closed=threading.Event(),
 58            dropped=[0])
 59
 60        self.__thread = threading.Thread(
 61            target=_logging_handler_thread,
 62            args=(self.__state, ),
 63            daemon=True)
 64
 65        self.__thread.start()
 66
 67    def close(self):
 68        """"See `logging.Handler.close`"""
 69        state = self.__state
 70
 71        with state.cv:
 72            if state.closed.is_set():
 73                return
 74
 75            state.closed.set()
 76
 77            with contextlib.suppress(Exception):
 78                # workaround for errors/0001.txt
 79                state.cv.notify_all()
 80
 81    def emit(self, record):
 82        """"See `logging.Handler.emit`"""
 83        msg = _record_to_msg(record)
 84        state = self.__state
 85
 86        with state.cv:
 87            if state.closed.is_set():
 88                return
 89
 90            state.queue.append(msg)
 91            while len(state.queue) > state.queue_size:
 92                state.queue.popleft()
 93                state.dropped[0] += 1
 94
 95            with contextlib.suppress(Exception):
 96                # workaround for errors/0001.txt
 97                state.cv.notify_all()
 98
 99
100# compatibility alias
101SysLogHandler = SyslogHandler
102
103
104class _ThreadState(typing.NamedTuple):
105    """Handler thread state"""
106    host: str
107    """Hostname"""
108    port: int
109    """TCP port"""
110    comm_type: common.CommType | str
111    """Communication type"""
112    queue: collections.deque
113    """Message queue"""
114    queue_size: int
115    """Message queue size"""
116    reconnect_delay: float
117    """Reconnect delay"""
118    cv: threading.Condition
119    """Conditional variable"""
120    closed: threading.Event
121    """Closed flag"""
122    dropped: list[int]
123    """Dropped message counter"""
124
125
126def _logging_handler_thread(state):
127    if state.comm_type == common.CommType.TLS:
128        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
129        ctx.check_hostname = False
130        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
131
132    while not state.closed.is_set():
133        try:
134            if state.comm_type == common.CommType.UDP:
135                s = socket.socket(type=socket.SOCK_DGRAM)
136                s.connect((state.host, state.port))
137
138            elif state.comm_type == common.CommType.TCP:
139                s = socket.create_connection((state.host, state.port))
140                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
141
142            elif state.comm_type == common.CommType.TLS:
143                s = ctx.wrap_socket(socket.create_connection(
144                    (state.host, state.port)))
145                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
146
147            else:
148                raise NotImplementedError()
149
150        except Exception:
151            time.sleep(state.reconnect_delay)
152            continue
153
154        try:
155            while True:
156                with state.cv:
157                    state.cv.wait_for(lambda: (state.closed.is_set() or
158                                               len(state.queue) or
159                                               state.dropped[0]))
160                    if state.closed.is_set():
161                        return
162
163                    if state.dropped[0]:
164                        msg = _create_dropped_msg(
165                            state.dropped[0], '_logging_handler_thread', 0)
166                        state.dropped[0] = 0
167
168                    else:
169                        msg = state.queue.popleft()
170
171                msg_bytes = encoder.msg_to_str(msg).encode()
172
173                if state.comm_type == common.CommType.UDP:
174                    s.send(msg_bytes)
175
176                else:
177                    s.send(f'{len(msg_bytes)} '.encode() + msg_bytes)
178
179        except Exception:
180            pass
181
182        finally:
183            with contextlib.suppress(Exception):
184                s.close()
185
186
187def _record_to_msg(record):
188    exc_info = ''
189    with contextlib.suppress(Exception):
190        if record.exc_info:
191            exc_info = ''.join(
192                traceback.TracebackException(*record.exc_info).format())
193
194    return common.Msg(
195        facility=common.Facility.USER,
196        severity=_logging_severity_dict[record.levelno],
197        version=1,
198        timestamp=record.created,
199        hostname=socket.gethostname(),
200        app_name=sys.argv[0],  # record.processName
201        procid=str(record.process) if record.process else None,
202        msgid=record.name[:32],
203        data=json.encode({
204            'hat@1': {
205                'name': str(record.name),
206                'thread': str(record.thread),
207                'funcName': str(record.funcName),
208                'lineno': str(record.lineno),
209                'exc_info': exc_info}}),
210        msg=record.getMessage())
211
212
213def _create_dropped_msg(dropped, func_name, lineno):
214    return common.Msg(
215        facility=common.Facility.USER,
216        severity=common.Severity.ERROR,
217        version=1,
218        timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
219        hostname=socket.gethostname(),
220        app_name=sys.argv[0],  # record.processName
221        procid=str(os.getpid()),
222        msgid=__name__,
223        data=json.encode({
224            'hat@1': {
225                'name': __name__,
226                'thread': str(threading.get_ident()),
227                'funcName': str(func_name),
228                'lineno': str(lineno),
229                'exc_info': ''}}),
230        msg=f'dropped {dropped} log messages')
231
232
233_logging_severity_dict = {logging.NOTSET: common.Severity.INFORMATIONAL,
234                          logging.DEBUG: common.Severity.DEBUG,
235                          logging.INFO: common.Severity.INFORMATIONAL,
236                          logging.WARNING: common.Severity.WARNING,
237                          logging.ERROR: common.Severity.ERROR,
238                          logging.CRITICAL: common.Severity.CRITICAL}
class SyslogHandler(logging.Handler):
27class SyslogHandler(logging.Handler):
28    """Syslog handler
29
30    Args:
31        host: remote host name
32        port: remote TCP/UDP port
33        comm_type: communication type
34        queue_size: message queue size
35        reconnect_delay: delay in seconds before retrying connection with
36            remote syslog server
37
38    """
39
40    def __init__(self,
41                 host: str,
42                 port: int,
43                 comm_type: common.CommType | str,
44                 queue_size: int = 1024,
45                 reconnect_delay: float = 5):
46        super().__init__()
47
48        self.__state = _ThreadState(
49            host=host,
50            port=port,
51            comm_type=(common.CommType[comm_type]
52                       if not isinstance(comm_type, common.CommType)
53                       else comm_type),
54            queue=collections.deque(),
55            queue_size=queue_size,
56            reconnect_delay=reconnect_delay,
57            cv=threading.Condition(),
58            closed=threading.Event(),
59            dropped=[0])
60
61        self.__thread = threading.Thread(
62            target=_logging_handler_thread,
63            args=(self.__state, ),
64            daemon=True)
65
66        self.__thread.start()
67
68    def close(self):
69        """"See `logging.Handler.close`"""
70        state = self.__state
71
72        with state.cv:
73            if state.closed.is_set():
74                return
75
76            state.closed.set()
77
78            with contextlib.suppress(Exception):
79                # workaround for errors/0001.txt
80                state.cv.notify_all()
81
82    def emit(self, record):
83        """"See `logging.Handler.emit`"""
84        msg = _record_to_msg(record)
85        state = self.__state
86
87        with state.cv:
88            if state.closed.is_set():
89                return
90
91            state.queue.append(msg)
92            while len(state.queue) > state.queue_size:
93                state.queue.popleft()
94                state.dropped[0] += 1
95
96            with contextlib.suppress(Exception):
97                # workaround for errors/0001.txt
98                state.cv.notify_all()

Syslog handler

Arguments:
  • host: remote host name
  • port: remote TCP/UDP port
  • comm_type: communication type
  • queue_size: message queue size
  • reconnect_delay: delay in seconds before retrying connection with remote syslog server
SyslogHandler( host: str, port: int, comm_type: hat.syslog.common.CommType | str, queue_size: int = 1024, reconnect_delay: float = 5)
40    def __init__(self,
41                 host: str,
42                 port: int,
43                 comm_type: common.CommType | str,
44                 queue_size: int = 1024,
45                 reconnect_delay: float = 5):
46        super().__init__()
47
48        self.__state = _ThreadState(
49            host=host,
50            port=port,
51            comm_type=(common.CommType[comm_type]
52                       if not isinstance(comm_type, common.CommType)
53                       else comm_type),
54            queue=collections.deque(),
55            queue_size=queue_size,
56            reconnect_delay=reconnect_delay,
57            cv=threading.Condition(),
58            closed=threading.Event(),
59            dropped=[0])
60
61        self.__thread = threading.Thread(
62            target=_logging_handler_thread,
63            args=(self.__state, ),
64            daemon=True)
65
66        self.__thread.start()

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def close(self):
68    def close(self):
69        """"See `logging.Handler.close`"""
70        state = self.__state
71
72        with state.cv:
73            if state.closed.is_set():
74                return
75
76            state.closed.set()
77
78            with contextlib.suppress(Exception):
79                # workaround for errors/0001.txt
80                state.cv.notify_all()

"See logging.Handler.close

def emit(self, record):
82    def emit(self, record):
83        """"See `logging.Handler.emit`"""
84        msg = _record_to_msg(record)
85        state = self.__state
86
87        with state.cv:
88            if state.closed.is_set():
89                return
90
91            state.queue.append(msg)
92            while len(state.queue) > state.queue_size:
93                state.queue.popleft()
94                state.dropped[0] += 1
95
96            with contextlib.suppress(Exception):
97                # workaround for errors/0001.txt
98                state.cv.notify_all()

"See logging.Handler.emit

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
SysLogHandler = <class 'SyslogHandler'>