hat.syslog.server.syslog
Syslog server implementation
1"""Syslog server implementation""" 2 3from pathlib import Path 4import asyncio.sslproto 5import contextlib 6import logging 7import ssl 8import typing 9import urllib.parse 10 11from hat import aio 12 13from hat.syslog.server import common 14from hat.syslog.server import encoder 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20MsgCb = aio.AsyncCallable[[common.Msg], None] 21 22SyslogServer = typing.Union['TcpSyslogServer', 'UdpSyslogServer'] 23 24 25async def create_syslog_server(addr: str, 26 msg_cb: MsgCb, 27 pem_path: Path | None 28 ) -> SyslogServer: 29 """Create syslog server""" 30 addr = urllib.parse.urlparse(addr) 31 32 if addr.scheme == 'tls': 33 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 34 ssl_ctx.load_cert_chain(pem_path) 35 return await _create_tcp_syslog_server(addr.hostname, addr.port, 36 msg_cb, ssl_ctx) 37 38 if addr.scheme == 'tcp': 39 return await _create_tcp_syslog_server(addr.hostname, addr.port, 40 msg_cb, None) 41 42 if addr.scheme == 'udp': 43 return await _create_udp_syslog_server(addr.hostname, addr.port, 44 msg_cb) 45 46 raise ValueError('unsupported address') 47 48 49async def _create_tcp_syslog_server(host, port, msg_cb, ssl_ctx): 50 server = TcpSyslogServer() 51 server._msg_cb = msg_cb 52 server._async_group = aio.Group() 53 54 server._srv = await asyncio.start_server(server._on_client, host, port, 55 ssl=ssl_ctx) 56 server.async_group.spawn(aio.call_on_cancel, server._on_close) 57 58 mlog.debug('listening for tcp syslog clients on %s:%s', host, port) 59 return server 60 61 62class TcpSyslogServer(aio.Resource): 63 """TCP syslog server""" 64 65 @property 66 def async_group(self) -> aio.Group: 67 """Async group""" 68 return self._async_group 69 70 async def _on_close(self): 71 with contextlib.suppress(Exception): 72 self._srv.close() 73 74 await self._srv.wait_closed() 75 76 def _on_client(self, reader, writer): 77 try: 78 self.async_group.spawn(self._client_loop, reader, writer) 79 80 except Exception: 81 writer.close() 82 83 async def _client_loop(self, reader, writer): 84 try: 85 while True: 86 start = await reader.readexactly(1) 87 88 if start == b'<': 89 buff_rest = await reader.readuntil(b'\n') 90 91 buff = start + buff_rest[:-1] 92 93 else: 94 size_rest = await reader.readuntil(b' ') 95 size = int(start + size_rest[:-1]) 96 97 buff = await reader.readexactly(size) 98 99 msg = encoder.msg_from_str(buff.decode()) 100 mlog.debug("received new syslog message") 101 102 await aio.call(self._msg_cb, msg) 103 104 except asyncio.IncompleteReadError: 105 pass 106 107 except Exception as e: 108 mlog.error('tcp client error: %s', e, exc_info=e) 109 110 finally: 111 with contextlib.suppress(Exception): 112 writer.close() 113 114 # BUGFIX 115 if isinstance(writer.transport, 116 asyncio.sslproto._SSLProtocolTransport): 117 # TODO for TLS connection Protocol.connection_lost is never 118 # called 119 await aio.uncancellable(asyncio.sleep(0.001)) 120 121 else: 122 await aio.uncancellable(writer.wait_closed()) 123 124 mlog.debug('tcp client connection closed') 125 126 127async def _create_udp_syslog_server(host, port, msg_cb): 128 server = UdpSyslogServer() 129 server._msg_cb = msg_cb 130 server._receive_queue = aio.Queue() 131 server._async_group = aio.Group() 132 133 class Protocol(asyncio.DatagramProtocol): 134 135 def connection_lost(self, exc): 136 server.close() 137 138 def datagram_received(self, data, addr): 139 if server._receive_queue.is_closed: 140 return 141 server._receive_queue.put_nowait(data) 142 143 loop = asyncio.get_running_loop() 144 server._transport, server._protocol = \ 145 await loop.create_datagram_endpoint(Protocol, (host, port), None) 146 server.async_group.spawn(aio.call_on_cancel, server._on_close) 147 server.async_group.spawn(server._receive_loop) 148 149 mlog.debug('listening for udp syslog messages on %s:%s', host, port) 150 return server 151 152 153class UdpSyslogServer(aio.Resource): 154 """UDP syslog server""" 155 156 @property 157 def async_group(self) -> aio.Group: 158 """Async group""" 159 return self._async_group 160 161 def _on_close(self): 162 with contextlib.suppress(Exception): 163 self._transport.close() 164 165 async def _receive_loop(self): 166 try: 167 while True: 168 try: 169 msg_bytes = await self._receive_queue.get() 170 msg = encoder.msg_from_str(msg_bytes.decode()) 171 mlog.debug("received new syslog message") 172 173 await aio.call(self._msg_cb, msg) 174 175 except Exception as e: 176 mlog.error('udp client error: %s', e, exc_info=e) 177 178 except Exception as e: 179 mlog.error('receive loop error: %s', e, exc_info=e) 180 181 finally: 182 self.close() 183 self._receive_queue.close()
Module logger
MsgCb =
typing.Callable[[hat.syslog.common.Msg], None | collections.abc.Awaitable[None]]
SyslogServer =
typing.Union[ForwardRef('TcpSyslogServer'), ForwardRef('UdpSyslogServer')]
async def
create_syslog_server( addr: str, msg_cb: Callable[[hat.syslog.common.Msg], None | Awaitable[None]], pem_path: pathlib._local.Path | None) -> Union[TcpSyslogServer, UdpSyslogServer]:
26async def create_syslog_server(addr: str, 27 msg_cb: MsgCb, 28 pem_path: Path | None 29 ) -> SyslogServer: 30 """Create syslog server""" 31 addr = urllib.parse.urlparse(addr) 32 33 if addr.scheme == 'tls': 34 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 35 ssl_ctx.load_cert_chain(pem_path) 36 return await _create_tcp_syslog_server(addr.hostname, addr.port, 37 msg_cb, ssl_ctx) 38 39 if addr.scheme == 'tcp': 40 return await _create_tcp_syslog_server(addr.hostname, addr.port, 41 msg_cb, None) 42 43 if addr.scheme == 'udp': 44 return await _create_udp_syslog_server(addr.hostname, addr.port, 45 msg_cb) 46 47 raise ValueError('unsupported address')
Create syslog server
class
TcpSyslogServer(hat.aio.group.Resource):
63class TcpSyslogServer(aio.Resource): 64 """TCP syslog server""" 65 66 @property 67 def async_group(self) -> aio.Group: 68 """Async group""" 69 return self._async_group 70 71 async def _on_close(self): 72 with contextlib.suppress(Exception): 73 self._srv.close() 74 75 await self._srv.wait_closed() 76 77 def _on_client(self, reader, writer): 78 try: 79 self.async_group.spawn(self._client_loop, reader, writer) 80 81 except Exception: 82 writer.close() 83 84 async def _client_loop(self, reader, writer): 85 try: 86 while True: 87 start = await reader.readexactly(1) 88 89 if start == b'<': 90 buff_rest = await reader.readuntil(b'\n') 91 92 buff = start + buff_rest[:-1] 93 94 else: 95 size_rest = await reader.readuntil(b' ') 96 size = int(start + size_rest[:-1]) 97 98 buff = await reader.readexactly(size) 99 100 msg = encoder.msg_from_str(buff.decode()) 101 mlog.debug("received new syslog message") 102 103 await aio.call(self._msg_cb, msg) 104 105 except asyncio.IncompleteReadError: 106 pass 107 108 except Exception as e: 109 mlog.error('tcp client error: %s', e, exc_info=e) 110 111 finally: 112 with contextlib.suppress(Exception): 113 writer.close() 114 115 # BUGFIX 116 if isinstance(writer.transport, 117 asyncio.sslproto._SSLProtocolTransport): 118 # TODO for TLS connection Protocol.connection_lost is never 119 # called 120 await aio.uncancellable(asyncio.sleep(0.001)) 121 122 else: 123 await aio.uncancellable(writer.wait_closed()) 124 125 mlog.debug('tcp client connection closed')
TCP syslog server
class
UdpSyslogServer(hat.aio.group.Resource):
154class UdpSyslogServer(aio.Resource): 155 """UDP syslog server""" 156 157 @property 158 def async_group(self) -> aio.Group: 159 """Async group""" 160 return self._async_group 161 162 def _on_close(self): 163 with contextlib.suppress(Exception): 164 self._transport.close() 165 166 async def _receive_loop(self): 167 try: 168 while True: 169 try: 170 msg_bytes = await self._receive_queue.get() 171 msg = encoder.msg_from_str(msg_bytes.decode()) 172 mlog.debug("received new syslog message") 173 174 await aio.call(self._msg_cb, msg) 175 176 except Exception as e: 177 mlog.error('udp client error: %s', e, exc_info=e) 178 179 except Exception as e: 180 mlog.error('receive loop error: %s', e, exc_info=e) 181 182 finally: 183 self.close() 184 self._receive_queue.close()
UDP syslog server