hat.syslog.server.syslog
Syslog server implementation
1"""Syslog server implementation""" 2 3from pathlib import Path 4import asyncio.sslproto 5import contextlib 6import logging 7import ssl 8import typing 9import urllib.parse 10 11from hat import aio 12 13from hat.syslog.server import common 14from hat.syslog.server import encoder 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20MsgCb = aio.AsyncCallable[[common.Msg], None] 21 22SyslogServer = typing.Union['TcpSyslogServer', 'UdpSyslogServer'] 23 24 25async def create_syslog_server(addr: str, 26 msg_cb: MsgCb, 27 pem_path: Path | None 28 ) -> SyslogServer: 29 """Create syslog server""" 30 addr = urllib.parse.urlparse(addr) 31 32 if addr.scheme == 'tls': 33 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 34 ssl_ctx.load_cert_chain(pem_path) 35 return await _create_tcp_syslog_server(addr.hostname, addr.port, 36 msg_cb, ssl_ctx) 37 38 if addr.scheme == 'tcp': 39 return await _create_tcp_syslog_server(addr.hostname, addr.port, 40 msg_cb, None) 41 42 if addr.scheme == 'udp': 43 return await _create_udp_syslog_server(addr.hostname, addr.port, 44 msg_cb) 45 46 raise ValueError('unsupported address') 47 48 49async def _create_tcp_syslog_server(host, port, msg_cb, ssl_ctx): 50 server = TcpSyslogServer() 51 server._msg_cb = msg_cb 52 server._async_group = aio.Group() 53 54 server._srv = await asyncio.start_server(server._on_client, host, port, 55 ssl=ssl_ctx) 56 server.async_group.spawn(aio.call_on_cancel, server._on_close) 57 58 mlog.debug('listening for tcp syslog clients on %s:%s', host, port) 59 return server 60 61 62class TcpSyslogServer(aio.Resource): 63 """TCP syslog server""" 64 65 @property 66 def async_group(self) -> aio.Group: 67 """Async group""" 68 return self._async_group 69 70 async def _on_close(self): 71 with contextlib.suppress(Exception): 72 self._srv.close() 73 74 await self._srv.wait_closed() 75 76 def _on_client(self, reader, writer): 77 try: 78 self.async_group.spawn(self._client_loop, reader, writer) 79 80 except Exception: 81 writer.close() 82 83 async def _client_loop(self, reader, writer): 84 try: 85 while True: 86 size = await reader.readuntil(b' ') 87 buff = await reader.readexactly(int(size[:-1])) 88 msg = encoder.msg_from_str(buff.decode()) 89 mlog.debug("received new syslog message") 90 91 await aio.call(self._msg_cb, msg) 92 93 except asyncio.IncompleteReadError: 94 pass 95 96 except Exception as e: 97 mlog.error('tcp client error: %s', e, exc_info=e) 98 99 finally: 100 with contextlib.suppress(Exception): 101 writer.close() 102 103 # BUGFIX 104 if isinstance(writer.transport, 105 asyncio.sslproto._SSLProtocolTransport): 106 # TODO for TLS connection Protocol.connection_lost is never 107 # called 108 await aio.uncancellable(asyncio.sleep(0.001)) 109 110 else: 111 await aio.uncancellable(writer.wait_closed()) 112 113 mlog.debug('tcp client connection closed') 114 115 116async def _create_udp_syslog_server(host, port, msg_cb): 117 server = UdpSyslogServer() 118 server._msg_cb = msg_cb 119 server._receive_queue = aio.Queue() 120 server._async_group = aio.Group() 121 122 class Protocol(asyncio.DatagramProtocol): 123 124 def connection_lost(self, exc): 125 server.close() 126 127 def datagram_received(self, data, addr): 128 if server._receive_queue.is_closed: 129 return 130 server._receive_queue.put_nowait(data) 131 132 loop = asyncio.get_running_loop() 133 server._transport, server._protocol = \ 134 await loop.create_datagram_endpoint(Protocol, (host, port), None) 135 server.async_group.spawn(aio.call_on_cancel, server._on_close) 136 server.async_group.spawn(server._receive_loop) 137 138 mlog.debug('listening for udp syslog messages on %s:%s', host, port) 139 return server 140 141 142class UdpSyslogServer(aio.Resource): 143 """UDP syslog server""" 144 145 @property 146 def async_group(self) -> aio.Group: 147 """Async group""" 148 return self._async_group 149 150 def _on_close(self): 151 with contextlib.suppress(Exception): 152 self._transport.close() 153 154 async def _receive_loop(self): 155 try: 156 while True: 157 try: 158 msg_bytes = await self._receive_queue.get() 159 msg = encoder.msg_from_str(msg_bytes.decode()) 160 mlog.debug("received new syslog message") 161 162 await aio.call(self._msg_cb, msg) 163 164 except Exception as e: 165 mlog.error('udp client error: %s', e, exc_info=e) 166 167 except Exception as e: 168 mlog.error('receive loop error: %s', e, exc_info=e) 169 170 finally: 171 self.close() 172 self._receive_queue.close()
Module logger
MsgCb =
typing.Callable[[hat.syslog.common.Msg], None | collections.abc.Awaitable[None]]
SyslogServer =
typing.Union[ForwardRef('TcpSyslogServer'), ForwardRef('UdpSyslogServer')]
async def
create_syslog_server( addr: str, msg_cb: Callable[[hat.syslog.common.Msg], None | Awaitable[None]], pem_path: pathlib.Path | None) -> Union[TcpSyslogServer, UdpSyslogServer]:
26async def create_syslog_server(addr: str, 27 msg_cb: MsgCb, 28 pem_path: Path | None 29 ) -> SyslogServer: 30 """Create syslog server""" 31 addr = urllib.parse.urlparse(addr) 32 33 if addr.scheme == 'tls': 34 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 35 ssl_ctx.load_cert_chain(pem_path) 36 return await _create_tcp_syslog_server(addr.hostname, addr.port, 37 msg_cb, ssl_ctx) 38 39 if addr.scheme == 'tcp': 40 return await _create_tcp_syslog_server(addr.hostname, addr.port, 41 msg_cb, None) 42 43 if addr.scheme == 'udp': 44 return await _create_udp_syslog_server(addr.hostname, addr.port, 45 msg_cb) 46 47 raise ValueError('unsupported address')
Create syslog server
class
TcpSyslogServer(hat.aio.group.Resource):
63class TcpSyslogServer(aio.Resource): 64 """TCP syslog server""" 65 66 @property 67 def async_group(self) -> aio.Group: 68 """Async group""" 69 return self._async_group 70 71 async def _on_close(self): 72 with contextlib.suppress(Exception): 73 self._srv.close() 74 75 await self._srv.wait_closed() 76 77 def _on_client(self, reader, writer): 78 try: 79 self.async_group.spawn(self._client_loop, reader, writer) 80 81 except Exception: 82 writer.close() 83 84 async def _client_loop(self, reader, writer): 85 try: 86 while True: 87 size = await reader.readuntil(b' ') 88 buff = await reader.readexactly(int(size[:-1])) 89 msg = encoder.msg_from_str(buff.decode()) 90 mlog.debug("received new syslog message") 91 92 await aio.call(self._msg_cb, msg) 93 94 except asyncio.IncompleteReadError: 95 pass 96 97 except Exception as e: 98 mlog.error('tcp client error: %s', e, exc_info=e) 99 100 finally: 101 with contextlib.suppress(Exception): 102 writer.close() 103 104 # BUGFIX 105 if isinstance(writer.transport, 106 asyncio.sslproto._SSLProtocolTransport): 107 # TODO for TLS connection Protocol.connection_lost is never 108 # called 109 await aio.uncancellable(asyncio.sleep(0.001)) 110 111 else: 112 await aio.uncancellable(writer.wait_closed()) 113 114 mlog.debug('tcp client connection closed')
TCP syslog server
class
UdpSyslogServer(hat.aio.group.Resource):
143class UdpSyslogServer(aio.Resource): 144 """UDP syslog server""" 145 146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._async_group 150 151 def _on_close(self): 152 with contextlib.suppress(Exception): 153 self._transport.close() 154 155 async def _receive_loop(self): 156 try: 157 while True: 158 try: 159 msg_bytes = await self._receive_queue.get() 160 msg = encoder.msg_from_str(msg_bytes.decode()) 161 mlog.debug("received new syslog message") 162 163 await aio.call(self._msg_cb, msg) 164 165 except Exception as e: 166 mlog.error('udp client error: %s', e, exc_info=e) 167 168 except Exception as e: 169 mlog.error('receive loop error: %s', e, exc_info=e) 170 171 finally: 172 self.close() 173 self._receive_queue.close()
UDP syslog server