hat.syslog.handler
Syslog handler
Implementation of logging.Handler
for syslog logging protocol.
1"""Syslog handler 2 3Implementation of `logging.Handler` for syslog logging protocol. 4 5""" 6 7import collections 8import contextlib 9import datetime 10import logging 11import os 12import socket 13import ssl 14import sys 15import threading 16import time 17import traceback 18import typing 19 20from hat import json 21 22from hat.syslog import common 23from hat.syslog import encoder 24 25 26class SyslogHandler(logging.Handler): 27 """Syslog handler 28 29 Args: 30 host: remote host name 31 port: remote TCP/UDP port 32 comm_type: communication type 33 queue_size: message queue size 34 reconnect_delay: delay in seconds before retrying connection with 35 remote syslog server 36 37 """ 38 39 def __init__(self, 40 host: str, 41 port: int, 42 comm_type: common.CommType | str, 43 queue_size: int = 1024, 44 reconnect_delay: float = 5): 45 super().__init__() 46 47 self.__state = _ThreadState( 48 host=host, 49 port=port, 50 comm_type=(common.CommType[comm_type] 51 if not isinstance(comm_type, common.CommType) 52 else comm_type), 53 queue=collections.deque(), 54 queue_size=queue_size, 55 reconnect_delay=reconnect_delay, 56 cv=threading.Condition(), 57 closed=threading.Event(), 58 dropped=[0]) 59 60 self.__thread = threading.Thread( 61 target=_logging_handler_thread, 62 args=(self.__state, ), 63 daemon=True) 64 65 self.__thread.start() 66 67 def close(self): 68 """"See `logging.Handler.close`""" 69 state = self.__state 70 71 with state.cv: 72 if state.closed.is_set(): 73 return 74 75 state.closed.set() 76 77 with contextlib.suppress(Exception): 78 # workaround for errors/0001.txt 79 state.cv.notify_all() 80 81 def emit(self, record): 82 """"See `logging.Handler.emit`""" 83 msg = _record_to_msg(record) 84 state = self.__state 85 86 with state.cv: 87 if state.closed.is_set(): 88 return 89 90 state.queue.append(msg) 91 while len(state.queue) > state.queue_size: 92 state.queue.popleft() 93 state.dropped[0] += 1 94 95 with contextlib.suppress(Exception): 96 # workaround for errors/0001.txt 97 state.cv.notify_all() 98 99 100# compatibility alias 101SysLogHandler = SyslogHandler 102 103 104class _ThreadState(typing.NamedTuple): 105 """Handler thread state""" 106 host: str 107 """Hostname""" 108 port: int 109 """TCP port""" 110 comm_type: common.CommType | str 111 """Communication type""" 112 queue: collections.deque 113 """Message queue""" 114 queue_size: int 115 """Message queue size""" 116 reconnect_delay: float 117 """Reconnect delay""" 118 cv: threading.Condition 119 """Conditional variable""" 120 closed: threading.Event 121 """Closed flag""" 122 dropped: list[int] 123 """Dropped message counter""" 124 125 126def _logging_handler_thread(state): 127 if state.comm_type == common.CommType.TLS: 128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 129 ctx.check_hostname = False 130 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 131 132 while not state.closed.is_set(): 133 try: 134 if state.comm_type == common.CommType.UDP: 135 s = socket.socket(type=socket.SOCK_DGRAM) 136 s.connect((state.host, state.port)) 137 138 elif state.comm_type == common.CommType.TCP: 139 s = socket.create_connection((state.host, state.port)) 140 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 141 142 elif state.comm_type == common.CommType.TLS: 143 s = ctx.wrap_socket(socket.create_connection( 144 (state.host, state.port))) 145 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 146 147 else: 148 raise NotImplementedError() 149 150 except Exception: 151 time.sleep(state.reconnect_delay) 152 continue 153 154 try: 155 while True: 156 with state.cv: 157 state.cv.wait_for(lambda: (state.closed.is_set() or 158 len(state.queue) or 159 state.dropped[0])) 160 if state.closed.is_set(): 161 return 162 163 if state.dropped[0]: 164 msg = _create_dropped_msg( 165 state.dropped[0], '_logging_handler_thread', 0) 166 state.dropped[0] = 0 167 168 else: 169 msg = state.queue.popleft() 170 171 msg_bytes = encoder.msg_to_str(msg).encode() 172 173 if state.comm_type == common.CommType.UDP: 174 s.send(msg_bytes) 175 176 else: 177 s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) 178 179 except Exception: 180 pass 181 182 finally: 183 with contextlib.suppress(Exception): 184 s.close() 185 186 187def _record_to_msg(record): 188 exc_info = '' 189 with contextlib.suppress(Exception): 190 if record.exc_info: 191 exc_info = ''.join( 192 traceback.TracebackException(*record.exc_info).format()) 193 194 return common.Msg( 195 facility=common.Facility.USER, 196 severity=_logging_severity_dict[record.levelno], 197 version=1, 198 timestamp=record.created, 199 hostname=socket.gethostname(), 200 app_name=sys.argv[0], # record.processName 201 procid=str(record.process) if record.process else None, 202 msgid=record.name[:32], 203 data=json.encode({ 204 'hat@1': { 205 'name': str(record.name), 206 'thread': str(record.thread), 207 'funcName': str(record.funcName), 208 'lineno': str(record.lineno), 209 'exc_info': exc_info}}), 210 msg=record.getMessage()) 211 212 213def _create_dropped_msg(dropped, func_name, lineno): 214 return common.Msg( 215 facility=common.Facility.USER, 216 severity=common.Severity.ERROR, 217 version=1, 218 timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(), 219 hostname=socket.gethostname(), 220 app_name=sys.argv[0], # record.processName 221 procid=str(os.getpid()), 222 msgid=__name__, 223 data=json.encode({ 224 'hat@1': { 225 'name': __name__, 226 'thread': str(threading.get_ident()), 227 'funcName': str(func_name), 228 'lineno': str(lineno), 229 'exc_info': ''}}), 230 msg=f'dropped {dropped} log messages') 231 232 233_logging_severity_dict = {logging.NOTSET: common.Severity.INFORMATIONAL, 234 logging.DEBUG: common.Severity.DEBUG, 235 logging.INFO: common.Severity.INFORMATIONAL, 236 logging.WARNING: common.Severity.WARNING, 237 logging.ERROR: common.Severity.ERROR, 238 logging.CRITICAL: common.Severity.CRITICAL}
class
SyslogHandler(logging.Handler):
27class SyslogHandler(logging.Handler): 28 """Syslog handler 29 30 Args: 31 host: remote host name 32 port: remote TCP/UDP port 33 comm_type: communication type 34 queue_size: message queue size 35 reconnect_delay: delay in seconds before retrying connection with 36 remote syslog server 37 38 """ 39 40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: common.CommType | str, 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 48 self.__state = _ThreadState( 49 host=host, 50 port=port, 51 comm_type=(common.CommType[comm_type] 52 if not isinstance(comm_type, common.CommType) 53 else comm_type), 54 queue=collections.deque(), 55 queue_size=queue_size, 56 reconnect_delay=reconnect_delay, 57 cv=threading.Condition(), 58 closed=threading.Event(), 59 dropped=[0]) 60 61 self.__thread = threading.Thread( 62 target=_logging_handler_thread, 63 args=(self.__state, ), 64 daemon=True) 65 66 self.__thread.start() 67 68 def close(self): 69 """"See `logging.Handler.close`""" 70 state = self.__state 71 72 with state.cv: 73 if state.closed.is_set(): 74 return 75 76 state.closed.set() 77 78 with contextlib.suppress(Exception): 79 # workaround for errors/0001.txt 80 state.cv.notify_all() 81 82 def emit(self, record): 83 """"See `logging.Handler.emit`""" 84 msg = _record_to_msg(record) 85 state = self.__state 86 87 with state.cv: 88 if state.closed.is_set(): 89 return 90 91 state.queue.append(msg) 92 while len(state.queue) > state.queue_size: 93 state.queue.popleft() 94 state.dropped[0] += 1 95 96 with contextlib.suppress(Exception): 97 # workaround for errors/0001.txt 98 state.cv.notify_all()
Syslog handler
Arguments:
- host: remote host name
- port: remote TCP/UDP port
- comm_type: communication type
- queue_size: message queue size
- reconnect_delay: delay in seconds before retrying connection with remote syslog server
SyslogHandler( host: str, port: int, comm_type: hat.syslog.common.CommType | str, queue_size: int = 1024, reconnect_delay: float = 5)
40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: common.CommType | str, 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 48 self.__state = _ThreadState( 49 host=host, 50 port=port, 51 comm_type=(common.CommType[comm_type] 52 if not isinstance(comm_type, common.CommType) 53 else comm_type), 54 queue=collections.deque(), 55 queue_size=queue_size, 56 reconnect_delay=reconnect_delay, 57 cv=threading.Condition(), 58 closed=threading.Event(), 59 dropped=[0]) 60 61 self.__thread = threading.Thread( 62 target=_logging_handler_thread, 63 args=(self.__state, ), 64 daemon=True) 65 66 self.__thread.start()
Initializes the instance - basically setting the formatter to None and the filter list to empty.
def
close(self):
68 def close(self): 69 """"See `logging.Handler.close`""" 70 state = self.__state 71 72 with state.cv: 73 if state.closed.is_set(): 74 return 75 76 state.closed.set() 77 78 with contextlib.suppress(Exception): 79 # workaround for errors/0001.txt 80 state.cv.notify_all()
"See logging.Handler.close
def
emit(self, record):
82 def emit(self, record): 83 """"See `logging.Handler.emit`""" 84 msg = _record_to_msg(record) 85 state = self.__state 86 87 with state.cv: 88 if state.closed.is_set(): 89 return 90 91 state.queue.append(msg) 92 while len(state.queue) > state.queue_size: 93 state.queue.popleft() 94 state.dropped[0] += 1 95 96 with contextlib.suppress(Exception): 97 # workaround for errors/0001.txt 98 state.cv.notify_all()
"See logging.Handler.emit
SysLogHandler =
<class 'SyslogHandler'>