hat.syslog.handler
Syslog handler
Implementation of logging.Handler
for syslog logging protocol.
1"""Syslog handler 2 3Implementation of `logging.Handler` for syslog logging protocol. 4 5""" 6 7import collections 8import contextlib 9import datetime 10import logging 11import os 12import socket 13import ssl 14import sys 15import threading 16import time 17import traceback 18import typing 19 20from hat import json 21 22from hat.syslog import common 23from hat.syslog import encoder 24 25 26class SyslogHandler(logging.Handler): 27 """Syslog handler 28 29 Args: 30 host: remote host name 31 port: remote TCP/UDP port 32 comm_type: communication type 33 queue_size: message queue size 34 reconnect_delay: delay in seconds before retrying connection with 35 remote syslog server 36 37 """ 38 39 def __init__(self, 40 host: str, 41 port: int, 42 comm_type: common.CommType | str, 43 queue_size: int = 1024, 44 reconnect_delay: float = 5): 45 super().__init__() 46 47 self.__state = _ThreadState( 48 host=host, 49 port=port, 50 comm_type=(common.CommType[comm_type] 51 if not isinstance(comm_type, common.CommType) 52 else comm_type), 53 queue=collections.deque(), 54 queue_size=queue_size, 55 reconnect_delay=reconnect_delay, 56 cv=threading.Condition(), 57 closed=threading.Event(), 58 dropped=[0]) 59 60 self.__thread = threading.Thread( 61 target=_logging_handler_thread, 62 args=(self.__state, ), 63 daemon=True) 64 65 self.__thread.start() 66 67 def close(self): 68 """"See `logging.Handler.close`""" 69 state = self.__state 70 71 with state.cv: 72 if state.closed.is_set(): 73 return 74 75 state.closed.set() 76 77 with contextlib.suppress(Exception): 78 # workaround for errors/0001.txt 79 state.cv.notify_all() 80 81 def emit(self, record): 82 """"See `logging.Handler.emit`""" 83 msg = _record_to_msg(record) 84 state = self.__state 85 86 with state.cv: 87 if state.closed.is_set(): 88 return 89 90 state.queue.append(msg) 91 while len(state.queue) > state.queue_size: 92 state.queue.popleft() 93 state.dropped[0] += 1 94 95 with contextlib.suppress(Exception): 96 # workaround for errors/0001.txt 97 state.cv.notify_all() 98 99 100# compatibility alias 101SysLogHandler = SyslogHandler 102 103 104class _ThreadState(typing.NamedTuple): 105 """Handler thread state""" 106 host: str 107 """Hostname""" 108 port: int 109 """TCP port""" 110 comm_type: common.CommType | str 111 """Communication type""" 112 queue: collections.deque 113 """Message queue""" 114 queue_size: int 115 """Message queue size""" 116 reconnect_delay: float 117 """Reconnect delay""" 118 cv: threading.Condition 119 """Conditional variable""" 120 closed: threading.Event 121 """Closed flag""" 122 dropped: list[int] 123 """Dropped message counter""" 124 125 126def _logging_handler_thread(state): 127 if state.comm_type == common.CommType.TLS: 128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 129 ctx.check_hostname = False 130 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 131 132 while not state.closed.is_set(): 133 try: 134 if state.comm_type == common.CommType.UDP: 135 s = socket.socket(type=socket.SOCK_DGRAM) 136 s.connect((state.host, state.port)) 137 138 elif state.comm_type == common.CommType.TCP: 139 s = socket.create_connection((state.host, state.port)) 140 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 141 142 elif state.comm_type == common.CommType.TLS: 143 s = ctx.wrap_socket(socket.create_connection( 144 (state.host, state.port))) 145 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 146 147 else: 148 raise NotImplementedError() 149 150 except Exception: 151 time.sleep(state.reconnect_delay) 152 continue 153 154 try: 155 while True: 156 with state.cv: 157 state.cv.wait_for(lambda: (state.closed.is_set() or 158 len(state.queue) or 159 state.dropped[0])) 160 if state.closed.is_set(): 161 return 162 163 if state.dropped[0]: 164 msg = _create_dropped_msg( 165 state.dropped[0], '_logging_handler_thread', 0) 166 state.dropped[0] = 0 167 168 else: 169 msg = state.queue.popleft() 170 171 msg_bytes = encoder.msg_to_str(msg).encode() 172 173 if state.comm_type == common.CommType.UDP: 174 s.send(msg_bytes) 175 176 else: 177 s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) 178 179 except Exception: 180 pass 181 182 finally: 183 with contextlib.suppress(Exception): 184 s.close() 185 186 187def _record_to_msg(record): 188 hat_data = {'name': str(record.name), 189 'thread': str(record.thread), 190 'funcName': str(record.funcName), 191 'lineno': str(record.lineno)} 192 193 with contextlib.suppress(Exception): 194 if record.exc_info: 195 hat_data['exc_info'] = ''.join( 196 traceback.TracebackException(*record.exc_info).format()) 197 198 with contextlib.suppress(Exception): 199 if hasattr(record, 'meta'): 200 hat_data['meta'] = json.encode(record.meta) 201 202 return common.Msg( 203 facility=common.Facility.USER, 204 severity=_logging_severity_dict[record.levelno], 205 version=1, 206 timestamp=record.created, 207 hostname=socket.gethostname(), 208 app_name=sys.argv[0], # record.processName 209 procid=str(record.process) if record.process else None, 210 msgid=record.name[:32], 211 data=json.encode({'hat@1': hat_data}), 212 msg=record.getMessage()) 213 214 215def _create_dropped_msg(dropped, func_name, lineno): 216 hat_data = {'name': __name__, 217 'thread': str(threading.get_ident()), 218 'funcName': str(func_name), 219 'lineno': str(lineno)} 220 221 return common.Msg( 222 facility=common.Facility.USER, 223 severity=common.Severity.ERROR, 224 version=1, 225 timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(), 226 hostname=socket.gethostname(), 227 app_name=sys.argv[0], # record.processName 228 procid=str(os.getpid()), 229 msgid=__name__[:32], 230 data=json.encode({'hat@1': hat_data}), 231 msg=f'dropped {dropped} log messages') 232 233 234_logging_severity_dict = {logging.NOTSET: common.Severity.INFORMATIONAL, 235 logging.DEBUG: common.Severity.DEBUG, 236 logging.INFO: common.Severity.INFORMATIONAL, 237 logging.WARNING: common.Severity.WARNING, 238 logging.ERROR: common.Severity.ERROR, 239 logging.CRITICAL: common.Severity.CRITICAL}
class
SyslogHandler(logging.Handler):
27class SyslogHandler(logging.Handler): 28 """Syslog handler 29 30 Args: 31 host: remote host name 32 port: remote TCP/UDP port 33 comm_type: communication type 34 queue_size: message queue size 35 reconnect_delay: delay in seconds before retrying connection with 36 remote syslog server 37 38 """ 39 40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: common.CommType | str, 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 48 self.__state = _ThreadState( 49 host=host, 50 port=port, 51 comm_type=(common.CommType[comm_type] 52 if not isinstance(comm_type, common.CommType) 53 else comm_type), 54 queue=collections.deque(), 55 queue_size=queue_size, 56 reconnect_delay=reconnect_delay, 57 cv=threading.Condition(), 58 closed=threading.Event(), 59 dropped=[0]) 60 61 self.__thread = threading.Thread( 62 target=_logging_handler_thread, 63 args=(self.__state, ), 64 daemon=True) 65 66 self.__thread.start() 67 68 def close(self): 69 """"See `logging.Handler.close`""" 70 state = self.__state 71 72 with state.cv: 73 if state.closed.is_set(): 74 return 75 76 state.closed.set() 77 78 with contextlib.suppress(Exception): 79 # workaround for errors/0001.txt 80 state.cv.notify_all() 81 82 def emit(self, record): 83 """"See `logging.Handler.emit`""" 84 msg = _record_to_msg(record) 85 state = self.__state 86 87 with state.cv: 88 if state.closed.is_set(): 89 return 90 91 state.queue.append(msg) 92 while len(state.queue) > state.queue_size: 93 state.queue.popleft() 94 state.dropped[0] += 1 95 96 with contextlib.suppress(Exception): 97 # workaround for errors/0001.txt 98 state.cv.notify_all()
Syslog handler
Arguments:
- host: remote host name
- port: remote TCP/UDP port
- comm_type: communication type
- queue_size: message queue size
- reconnect_delay: delay in seconds before retrying connection with remote syslog server
SyslogHandler( host: str, port: int, comm_type: hat.syslog.common.CommType | str, queue_size: int = 1024, reconnect_delay: float = 5)
40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: common.CommType | str, 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 48 self.__state = _ThreadState( 49 host=host, 50 port=port, 51 comm_type=(common.CommType[comm_type] 52 if not isinstance(comm_type, common.CommType) 53 else comm_type), 54 queue=collections.deque(), 55 queue_size=queue_size, 56 reconnect_delay=reconnect_delay, 57 cv=threading.Condition(), 58 closed=threading.Event(), 59 dropped=[0]) 60 61 self.__thread = threading.Thread( 62 target=_logging_handler_thread, 63 args=(self.__state, ), 64 daemon=True) 65 66 self.__thread.start()
Initializes the instance - basically setting the formatter to None and the filter list to empty.
def
close(self):
68 def close(self): 69 """"See `logging.Handler.close`""" 70 state = self.__state 71 72 with state.cv: 73 if state.closed.is_set(): 74 return 75 76 state.closed.set() 77 78 with contextlib.suppress(Exception): 79 # workaround for errors/0001.txt 80 state.cv.notify_all()
"See logging.Handler.close
def
emit(self, record):
82 def emit(self, record): 83 """"See `logging.Handler.emit`""" 84 msg = _record_to_msg(record) 85 state = self.__state 86 87 with state.cv: 88 if state.closed.is_set(): 89 return 90 91 state.queue.append(msg) 92 while len(state.queue) > state.queue_size: 93 state.queue.popleft() 94 state.dropped[0] += 1 95 96 with contextlib.suppress(Exception): 97 # workaround for errors/0001.txt 98 state.cv.notify_all()
"See logging.Handler.emit
SysLogHandler =
<class 'SyslogHandler'>