hat.syslog.handler
Syslog hadler
Implementation of logging.Handler
for syslog logging protocol.
1"""Syslog hadler 2 3Implementation of `logging.Handler` for syslog logging protocol. 4 5""" 6 7import collections 8import contextlib 9import datetime 10import logging 11import os 12import socket 13import ssl 14import sys 15import threading 16import time 17import traceback 18import typing 19 20from hat import json 21from hat.syslog import common 22from hat.syslog import encoder 23 24 25class SysLogHandler(logging.Handler): 26 """Syslog handler 27 28 Args: 29 host: remote host name 30 port: remote TCP/UDP port 31 comm_type: communication type 32 queue_size: message queue size 33 reconnect_delay: delay in seconds before retrying connection with 34 remote syslog server 35 36 """ 37 38 def __init__(self, 39 host: str, 40 port: int, 41 comm_type: typing.Union[common.CommType, str], 42 queue_size: int = 1024, 43 reconnect_delay: float = 5): 44 super().__init__() 45 self.__state = _ThreadState( 46 host=host, 47 port=port, 48 comm_type=(common.CommType[comm_type] 49 if not isinstance(comm_type, common.CommType) 50 else comm_type), 51 queue=collections.deque(), 52 queue_size=queue_size, 53 reconnect_delay=reconnect_delay, 54 cv=threading.Condition(), 55 closed=threading.Event(), 56 dropped=[0]) 57 self.__thread = threading.Thread( 58 target=_logging_handler_thread, 59 args=(self.__state, ), 60 daemon=True) 61 self.__thread.start() 62 63 def close(self): 64 """"See `logging.Handler.close`""" 65 state = self.__state 66 with state.cv: 67 if state.closed.is_set(): 68 return 69 state.closed.set() 70 with contextlib.suppress(Exception): 71 # workaround for errors/0001.txt 72 state.cv.notify_all() 73 74 def emit(self, record): 75 """"See `logging.Handler.emit`""" 76 msg = _record_to_msg(record) 77 state = self.__state 78 with state.cv: 79 if state.closed.is_set(): 80 return 81 state.queue.append(msg) 82 while len(state.queue) > state.queue_size: 83 state.queue.popleft() 84 state.dropped[0] += 1 85 with contextlib.suppress(Exception): 86 # workaround for errors/0001.txt 87 state.cv.notify_all() 88 89 90class _ThreadState(typing.NamedTuple): 91 """Handler thread state""" 92 host: str 93 """Hostname""" 94 port: int 95 """TCP port""" 96 comm_type: typing.Union[common.CommType, str] 97 """Communication type""" 98 queue: collections.deque 99 """Message queue""" 100 queue_size: int 101 """Message queue size""" 102 reconnect_delay: float 103 """Reconnect delay""" 104 cv: threading.Condition 105 """Conditional variable""" 106 closed: threading.Event 107 """Closed flag""" 108 dropped: typing.List[int] 109 """Dropped message counter""" 110 111 112def _logging_handler_thread(state): 113 msg = None 114 while not state.closed.is_set(): 115 try: 116 if state.comm_type == common.CommType.UDP: 117 s = socket.socket(type=socket.SOCK_DGRAM) 118 s.connect((state.host, state.port)) 119 elif state.comm_type == common.CommType.TCP: 120 s = socket.create_connection((state.host, state.port)) 121 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 122 elif state.comm_type == common.CommType.SSL: 123 s = ssl.wrap_socket(socket.create_connection( 124 (state.host, state.port))) 125 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 126 else: 127 raise NotImplementedError() 128 except Exception: 129 time.sleep(state.reconnect_delay) 130 continue 131 try: 132 while True: 133 if not msg: 134 with state.cv: 135 state.cv.wait_for(lambda: (state.closed.is_set() or 136 len(state.queue) or 137 state.dropped[0])) 138 if state.closed.is_set(): 139 return 140 if state.dropped[0]: 141 msg = _create_dropped_msg( 142 state.dropped[0], '_logging_handler_thread', 0) 143 state.dropped[0] = 0 144 else: 145 msg = state.queue.popleft() 146 msg_bytes = encoder.msg_to_str(msg).encode() 147 s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) 148 msg = None 149 except Exception: 150 pass 151 finally: 152 with contextlib.suppress(Exception): 153 s.close() 154 155 156def _record_to_msg(record): 157 exc_info = '' 158 with contextlib.suppress(Exception): 159 if record.exc_info: 160 exc_info = ''.join( 161 traceback.TracebackException(*record.exc_info).format()) 162 return common.Msg( 163 facility=common.Facility.USER, 164 severity={ 165 logging.NOTSET: common.Severity.INFORMATIONAL, 166 logging.DEBUG: common.Severity.DEBUG, 167 logging.INFO: common.Severity.INFORMATIONAL, 168 logging.WARNING: common.Severity.WARNING, 169 logging.ERROR: common.Severity.ERROR, 170 logging.CRITICAL: common.Severity.CRITICAL}[record.levelno], 171 version=1, 172 timestamp=record.created, 173 hostname=socket.gethostname(), 174 app_name=sys.argv[0], # record.processName 175 procid=str(record.process) if record.process else None, 176 msgid=record.name[:32], 177 data=json.encode({ 178 'hat@1': { 179 'name': str(record.name), 180 'thread': str(record.thread), 181 'funcName': str(record.funcName), 182 'lineno': str(record.lineno), 183 'exc_info': exc_info}}), 184 msg=record.getMessage()) 185 186 187def _create_dropped_msg(dropped, func_name, lineno): 188 return common.Msg( 189 facility=common.Facility.USER, 190 severity=common.Severity.ERROR, 191 version=1, 192 timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(), 193 hostname=socket.gethostname(), 194 app_name=sys.argv[0], # record.processName 195 procid=str(os.getpid()), 196 msgid=__name__, 197 data=json.encode({ 198 'hat@1': { 199 'name': __name__, 200 'thread': str(threading.get_ident()), 201 'funcName': str(func_name), 202 'lineno': str(lineno), 203 'exc_info': ''}}), 204 msg=f'dropped {dropped} log messages')
class
SysLogHandler(logging.Handler):
26class SysLogHandler(logging.Handler): 27 """Syslog handler 28 29 Args: 30 host: remote host name 31 port: remote TCP/UDP port 32 comm_type: communication type 33 queue_size: message queue size 34 reconnect_delay: delay in seconds before retrying connection with 35 remote syslog server 36 37 """ 38 39 def __init__(self, 40 host: str, 41 port: int, 42 comm_type: typing.Union[common.CommType, str], 43 queue_size: int = 1024, 44 reconnect_delay: float = 5): 45 super().__init__() 46 self.__state = _ThreadState( 47 host=host, 48 port=port, 49 comm_type=(common.CommType[comm_type] 50 if not isinstance(comm_type, common.CommType) 51 else comm_type), 52 queue=collections.deque(), 53 queue_size=queue_size, 54 reconnect_delay=reconnect_delay, 55 cv=threading.Condition(), 56 closed=threading.Event(), 57 dropped=[0]) 58 self.__thread = threading.Thread( 59 target=_logging_handler_thread, 60 args=(self.__state, ), 61 daemon=True) 62 self.__thread.start() 63 64 def close(self): 65 """"See `logging.Handler.close`""" 66 state = self.__state 67 with state.cv: 68 if state.closed.is_set(): 69 return 70 state.closed.set() 71 with contextlib.suppress(Exception): 72 # workaround for errors/0001.txt 73 state.cv.notify_all() 74 75 def emit(self, record): 76 """"See `logging.Handler.emit`""" 77 msg = _record_to_msg(record) 78 state = self.__state 79 with state.cv: 80 if state.closed.is_set(): 81 return 82 state.queue.append(msg) 83 while len(state.queue) > state.queue_size: 84 state.queue.popleft() 85 state.dropped[0] += 1 86 with contextlib.suppress(Exception): 87 # workaround for errors/0001.txt 88 state.cv.notify_all()
Syslog handler
Args
- host: remote host name
- port: remote TCP/UDP port
- comm_type: communication type
- queue_size: message queue size
- reconnect_delay: delay in seconds before retrying connection with remote syslog server
SysLogHandler( host: str, port: int, comm_type: Union[hat.syslog.common.CommType, str], queue_size: int = 1024, reconnect_delay: float = 5)
39 def __init__(self, 40 host: str, 41 port: int, 42 comm_type: typing.Union[common.CommType, str], 43 queue_size: int = 1024, 44 reconnect_delay: float = 5): 45 super().__init__() 46 self.__state = _ThreadState( 47 host=host, 48 port=port, 49 comm_type=(common.CommType[comm_type] 50 if not isinstance(comm_type, common.CommType) 51 else comm_type), 52 queue=collections.deque(), 53 queue_size=queue_size, 54 reconnect_delay=reconnect_delay, 55 cv=threading.Condition(), 56 closed=threading.Event(), 57 dropped=[0]) 58 self.__thread = threading.Thread( 59 target=_logging_handler_thread, 60 args=(self.__state, ), 61 daemon=True) 62 self.__thread.start()
Initializes the instance - basically setting the formatter to None and the filter list to empty.
def
close(self)
64 def close(self): 65 """"See `logging.Handler.close`""" 66 state = self.__state 67 with state.cv: 68 if state.closed.is_set(): 69 return 70 state.closed.set() 71 with contextlib.suppress(Exception): 72 # workaround for errors/0001.txt 73 state.cv.notify_all()
"See logging.Handler.close
def
emit(self, record)
75 def emit(self, record): 76 """"See `logging.Handler.emit`""" 77 msg = _record_to_msg(record) 78 state = self.__state 79 with state.cv: 80 if state.closed.is_set(): 81 return 82 state.queue.append(msg) 83 while len(state.queue) > state.queue_size: 84 state.queue.popleft() 85 state.dropped[0] += 1 86 with contextlib.suppress(Exception): 87 # workaround for errors/0001.txt 88 state.cv.notify_all()
"See logging.Handler.emit
Inherited Members
- logging.Handler
- get_name
- set_name
- name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- handleError
- logging.Filterer
- addFilter
- removeFilter
- filter