hat.syslog.server.syslog

Syslog server implementation

  1"""Syslog server implementation"""
  2
  3from pathlib import Path
  4import asyncio.sslproto
  5import contextlib
  6import logging
  7import ssl
  8import typing
  9import urllib.parse
 10
 11from hat import aio
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import encoder
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20MsgCb = aio.AsyncCallable[[common.Msg], None]
 21
 22SyslogServer = typing.Union['TcpSyslogServer', 'UdpSyslogServer']
 23
 24
 25async def create_syslog_server(addr: str,
 26                               msg_cb: MsgCb,
 27                               pem_path: Path | None
 28                               ) -> SyslogServer:
 29    """Create syslog server"""
 30    addr = urllib.parse.urlparse(addr)
 31
 32    if addr.scheme == 'tls':
 33        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 34        ssl_ctx.load_cert_chain(pem_path)
 35        return await _create_tcp_syslog_server(addr.hostname, addr.port,
 36                                               msg_cb, ssl_ctx)
 37
 38    if addr.scheme == 'tcp':
 39        return await _create_tcp_syslog_server(addr.hostname, addr.port,
 40                                               msg_cb, None)
 41
 42    if addr.scheme == 'udp':
 43        return await _create_udp_syslog_server(addr.hostname, addr.port,
 44                                               msg_cb)
 45
 46    raise ValueError('unsupported address')
 47
 48
 49async def _create_tcp_syslog_server(host, port, msg_cb, ssl_ctx):
 50    server = TcpSyslogServer()
 51    server._msg_cb = msg_cb
 52    server._async_group = aio.Group()
 53
 54    server._srv = await asyncio.start_server(server._on_client, host, port,
 55                                             ssl=ssl_ctx)
 56    server.async_group.spawn(aio.call_on_cancel, server._on_close)
 57
 58    mlog.debug('listening for tcp syslog clients on %s:%s', host, port)
 59    return server
 60
 61
 62class TcpSyslogServer(aio.Resource):
 63    """TCP syslog server"""
 64
 65    @property
 66    def async_group(self) -> aio.Group:
 67        """Async group"""
 68        return self._async_group
 69
 70    async def _on_close(self):
 71        with contextlib.suppress(Exception):
 72            self._srv.close()
 73
 74        await self._srv.wait_closed()
 75
 76    def _on_client(self, reader, writer):
 77        try:
 78            self.async_group.spawn(self._client_loop, reader, writer)
 79
 80        except Exception:
 81            writer.close()
 82
 83    async def _client_loop(self, reader, writer):
 84        try:
 85            while True:
 86                start = await reader.readexactly(1)
 87
 88                if start == b'<':
 89                    buff_rest = await reader.readuntil(b'\n')
 90
 91                    buff = start + buff_rest[:-1]
 92
 93                else:
 94                    size_rest = await reader.readuntil(b' ')
 95                    size = int(start + size_rest[:-1])
 96
 97                    buff = await reader.readexactly(size)
 98
 99                msg = encoder.msg_from_str(buff.decode())
100                mlog.debug("received new syslog message")
101
102                await aio.call(self._msg_cb, msg)
103
104        except asyncio.IncompleteReadError:
105            pass
106
107        except Exception as e:
108            mlog.error('tcp client error: %s', e, exc_info=e)
109
110        finally:
111            with contextlib.suppress(Exception):
112                writer.close()
113
114            # BUGFIX
115            if isinstance(writer.transport,
116                          asyncio.sslproto._SSLProtocolTransport):
117                # TODO for TLS connection Protocol.connection_lost is never
118                #      called
119                await aio.uncancellable(asyncio.sleep(0.001))
120
121            else:
122                await aio.uncancellable(writer.wait_closed())
123
124            mlog.debug('tcp client connection closed')
125
126
127async def _create_udp_syslog_server(host, port, msg_cb):
128    server = UdpSyslogServer()
129    server._msg_cb = msg_cb
130    server._receive_queue = aio.Queue()
131    server._async_group = aio.Group()
132
133    class Protocol(asyncio.DatagramProtocol):
134
135        def connection_lost(self, exc):
136            server.close()
137
138        def datagram_received(self, data, addr):
139            if server._receive_queue.is_closed:
140                return
141            server._receive_queue.put_nowait(data)
142
143    loop = asyncio.get_running_loop()
144    server._transport, server._protocol = \
145        await loop.create_datagram_endpoint(Protocol, (host, port), None)
146    server.async_group.spawn(aio.call_on_cancel, server._on_close)
147    server.async_group.spawn(server._receive_loop)
148
149    mlog.debug('listening for udp syslog messages on %s:%s', host, port)
150    return server
151
152
153class UdpSyslogServer(aio.Resource):
154    """UDP syslog server"""
155
156    @property
157    def async_group(self) -> aio.Group:
158        """Async group"""
159        return self._async_group
160
161    def _on_close(self):
162        with contextlib.suppress(Exception):
163            self._transport.close()
164
165    async def _receive_loop(self):
166        try:
167            while True:
168                try:
169                    msg_bytes = await self._receive_queue.get()
170                    msg = encoder.msg_from_str(msg_bytes.decode())
171                    mlog.debug("received new syslog message")
172
173                    await aio.call(self._msg_cb, msg)
174
175                except Exception as e:
176                    mlog.error('udp client error: %s', e, exc_info=e)
177
178        except Exception as e:
179            mlog.error('receive loop error: %s', e, exc_info=e)
180
181        finally:
182            self.close()
183            self._receive_queue.close()
mlog: logging.Logger = <Logger hat.syslog.server.syslog (WARNING)>

Module logger

MsgCb = typing.Callable[[hat.syslog.common.Msg], None | collections.abc.Awaitable[None]]
SyslogServer = typing.Union[ForwardRef('TcpSyslogServer'), ForwardRef('UdpSyslogServer')]
async def create_syslog_server( addr: str, msg_cb: Callable[[hat.syslog.common.Msg], None | Awaitable[None]], pem_path: pathlib._local.Path | None) -> Union[TcpSyslogServer, UdpSyslogServer]:
26async def create_syslog_server(addr: str,
27                               msg_cb: MsgCb,
28                               pem_path: Path | None
29                               ) -> SyslogServer:
30    """Create syslog server"""
31    addr = urllib.parse.urlparse(addr)
32
33    if addr.scheme == 'tls':
34        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
35        ssl_ctx.load_cert_chain(pem_path)
36        return await _create_tcp_syslog_server(addr.hostname, addr.port,
37                                               msg_cb, ssl_ctx)
38
39    if addr.scheme == 'tcp':
40        return await _create_tcp_syslog_server(addr.hostname, addr.port,
41                                               msg_cb, None)
42
43    if addr.scheme == 'udp':
44        return await _create_udp_syslog_server(addr.hostname, addr.port,
45                                               msg_cb)
46
47    raise ValueError('unsupported address')

Create syslog server

class TcpSyslogServer(hat.aio.group.Resource):
 63class TcpSyslogServer(aio.Resource):
 64    """TCP syslog server"""
 65
 66    @property
 67    def async_group(self) -> aio.Group:
 68        """Async group"""
 69        return self._async_group
 70
 71    async def _on_close(self):
 72        with contextlib.suppress(Exception):
 73            self._srv.close()
 74
 75        await self._srv.wait_closed()
 76
 77    def _on_client(self, reader, writer):
 78        try:
 79            self.async_group.spawn(self._client_loop, reader, writer)
 80
 81        except Exception:
 82            writer.close()
 83
 84    async def _client_loop(self, reader, writer):
 85        try:
 86            while True:
 87                start = await reader.readexactly(1)
 88
 89                if start == b'<':
 90                    buff_rest = await reader.readuntil(b'\n')
 91
 92                    buff = start + buff_rest[:-1]
 93
 94                else:
 95                    size_rest = await reader.readuntil(b' ')
 96                    size = int(start + size_rest[:-1])
 97
 98                    buff = await reader.readexactly(size)
 99
100                msg = encoder.msg_from_str(buff.decode())
101                mlog.debug("received new syslog message")
102
103                await aio.call(self._msg_cb, msg)
104
105        except asyncio.IncompleteReadError:
106            pass
107
108        except Exception as e:
109            mlog.error('tcp client error: %s', e, exc_info=e)
110
111        finally:
112            with contextlib.suppress(Exception):
113                writer.close()
114
115            # BUGFIX
116            if isinstance(writer.transport,
117                          asyncio.sslproto._SSLProtocolTransport):
118                # TODO for TLS connection Protocol.connection_lost is never
119                #      called
120                await aio.uncancellable(asyncio.sleep(0.001))
121
122            else:
123                await aio.uncancellable(writer.wait_closed())
124
125            mlog.debug('tcp client connection closed')

TCP syslog server

async_group: hat.aio.group.Group
66    @property
67    def async_group(self) -> aio.Group:
68        """Async group"""
69        return self._async_group

Async group

class UdpSyslogServer(hat.aio.group.Resource):
154class UdpSyslogServer(aio.Resource):
155    """UDP syslog server"""
156
157    @property
158    def async_group(self) -> aio.Group:
159        """Async group"""
160        return self._async_group
161
162    def _on_close(self):
163        with contextlib.suppress(Exception):
164            self._transport.close()
165
166    async def _receive_loop(self):
167        try:
168            while True:
169                try:
170                    msg_bytes = await self._receive_queue.get()
171                    msg = encoder.msg_from_str(msg_bytes.decode())
172                    mlog.debug("received new syslog message")
173
174                    await aio.call(self._msg_cb, msg)
175
176                except Exception as e:
177                    mlog.error('udp client error: %s', e, exc_info=e)
178
179        except Exception as e:
180            mlog.error('receive loop error: %s', e, exc_info=e)
181
182        finally:
183            self.close()
184            self._receive_queue.close()

UDP syslog server

async_group: hat.aio.group.Group
157    @property
158    def async_group(self) -> aio.Group:
159        """Async group"""
160        return self._async_group

Async group