hat.syslog.server.syslog

Syslog server implementation

  1"""Syslog server implementation"""
  2
  3from pathlib import Path
  4import asyncio.sslproto
  5import contextlib
  6import logging
  7import ssl
  8import typing
  9import urllib.parse
 10
 11from hat import aio
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import encoder
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20MsgCb = aio.AsyncCallable[[common.Msg], None]
 21
 22SyslogServer = typing.Union['TcpSyslogServer', 'UdpSyslogServer']
 23
 24
 25async def create_syslog_server(addr: str,
 26                               msg_cb: MsgCb,
 27                               pem_path: Path | None
 28                               ) -> SyslogServer:
 29    """Create syslog server"""
 30    addr = urllib.parse.urlparse(addr)
 31
 32    if addr.scheme == 'tls':
 33        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 34        ssl_ctx.load_cert_chain(pem_path)
 35        return await _create_tcp_syslog_server(addr.hostname, addr.port,
 36                                               msg_cb, ssl_ctx)
 37
 38    if addr.scheme == 'tcp':
 39        return await _create_tcp_syslog_server(addr.hostname, addr.port,
 40                                               msg_cb, None)
 41
 42    if addr.scheme == 'udp':
 43        return await _create_udp_syslog_server(addr.hostname, addr.port,
 44                                               msg_cb)
 45
 46    raise ValueError('unsupported address')
 47
 48
 49async def _create_tcp_syslog_server(host, port, msg_cb, ssl_ctx):
 50    server = TcpSyslogServer()
 51    server._msg_cb = msg_cb
 52    server._async_group = aio.Group()
 53
 54    server._srv = await asyncio.start_server(server._on_client, host, port,
 55                                             ssl=ssl_ctx)
 56    server.async_group.spawn(aio.call_on_cancel, server._on_close)
 57
 58    mlog.debug('listening for tcp syslog clients on %s:%s', host, port)
 59    return server
 60
 61
 62class TcpSyslogServer(aio.Resource):
 63    """TCP syslog server"""
 64
 65    @property
 66    def async_group(self) -> aio.Group:
 67        """Async group"""
 68        return self._async_group
 69
 70    async def _on_close(self):
 71        with contextlib.suppress(Exception):
 72            self._srv.close()
 73
 74        await self._srv.wait_closed()
 75
 76    def _on_client(self, reader, writer):
 77        try:
 78            self.async_group.spawn(self._client_loop, reader, writer)
 79
 80        except Exception:
 81            writer.close()
 82
 83    async def _client_loop(self, reader, writer):
 84        try:
 85            while True:
 86                size = await reader.readuntil(b' ')
 87                buff = await reader.readexactly(int(size[:-1]))
 88                msg = encoder.msg_from_str(buff.decode())
 89                mlog.debug("received new syslog message")
 90
 91                await aio.call(self._msg_cb, msg)
 92
 93        except asyncio.IncompleteReadError:
 94            pass
 95
 96        except Exception as e:
 97            mlog.error('tcp client error: %s', e, exc_info=e)
 98
 99        finally:
100            with contextlib.suppress(Exception):
101                writer.close()
102
103            # BUGFIX
104            if isinstance(writer.transport,
105                          asyncio.sslproto._SSLProtocolTransport):
106                # TODO for TLS connection Protocol.connection_lost is never
107                #      called
108                await aio.uncancellable(asyncio.sleep(0.001))
109
110            else:
111                await aio.uncancellable(writer.wait_closed())
112
113            mlog.debug('tcp client connection closed')
114
115
116async def _create_udp_syslog_server(host, port, msg_cb):
117    server = UdpSyslogServer()
118    server._msg_cb = msg_cb
119    server._receive_queue = aio.Queue()
120    server._async_group = aio.Group()
121
122    class Protocol(asyncio.DatagramProtocol):
123
124        def connection_lost(self, exc):
125            server.close()
126
127        def datagram_received(self, data, addr):
128            if server._receive_queue.is_closed:
129                return
130            server._receive_queue.put_nowait(data)
131
132    loop = asyncio.get_running_loop()
133    server._transport, server._protocol = \
134        await loop.create_datagram_endpoint(Protocol, (host, port), None)
135    server.async_group.spawn(aio.call_on_cancel, server._on_close)
136    server.async_group.spawn(server._receive_loop)
137
138    mlog.debug('listening for udp syslog messages on %s:%s', host, port)
139    return server
140
141
142class UdpSyslogServer(aio.Resource):
143    """UDP syslog server"""
144
145    @property
146    def async_group(self) -> aio.Group:
147        """Async group"""
148        return self._async_group
149
150    def _on_close(self):
151        with contextlib.suppress(Exception):
152            self._transport.close()
153
154    async def _receive_loop(self):
155        try:
156            while True:
157                try:
158                    msg_bytes = await self._receive_queue.get()
159                    msg = encoder.msg_from_str(msg_bytes.decode())
160                    mlog.debug("received new syslog message")
161
162                    await aio.call(self._msg_cb, msg)
163
164                except Exception as e:
165                    mlog.error('udp client error: %s', e, exc_info=e)
166
167        except Exception as e:
168            mlog.error('receive loop error: %s', e, exc_info=e)
169
170        finally:
171            self.close()
172            self._receive_queue.close()
mlog: logging.Logger = <Logger hat.syslog.server.syslog (WARNING)>

Module logger

MsgCb = typing.Callable[[hat.syslog.common.Msg], typing.Optional[typing.Awaitable[NoneType]]]
SyslogServer = typing.Union[ForwardRef('TcpSyslogServer'), ForwardRef('UdpSyslogServer')]
async def create_syslog_server( addr: str, msg_cb: Callable[[hat.syslog.common.Msg], Optional[Awaitable[NoneType]]], pem_path: pathlib.Path | None) -> Union[TcpSyslogServer, UdpSyslogServer]:
26async def create_syslog_server(addr: str,
27                               msg_cb: MsgCb,
28                               pem_path: Path | None
29                               ) -> SyslogServer:
30    """Create syslog server"""
31    addr = urllib.parse.urlparse(addr)
32
33    if addr.scheme == 'tls':
34        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
35        ssl_ctx.load_cert_chain(pem_path)
36        return await _create_tcp_syslog_server(addr.hostname, addr.port,
37                                               msg_cb, ssl_ctx)
38
39    if addr.scheme == 'tcp':
40        return await _create_tcp_syslog_server(addr.hostname, addr.port,
41                                               msg_cb, None)
42
43    if addr.scheme == 'udp':
44        return await _create_udp_syslog_server(addr.hostname, addr.port,
45                                               msg_cb)
46
47    raise ValueError('unsupported address')

Create syslog server

class TcpSyslogServer(hat.aio.group.Resource):
 63class TcpSyslogServer(aio.Resource):
 64    """TCP syslog server"""
 65
 66    @property
 67    def async_group(self) -> aio.Group:
 68        """Async group"""
 69        return self._async_group
 70
 71    async def _on_close(self):
 72        with contextlib.suppress(Exception):
 73            self._srv.close()
 74
 75        await self._srv.wait_closed()
 76
 77    def _on_client(self, reader, writer):
 78        try:
 79            self.async_group.spawn(self._client_loop, reader, writer)
 80
 81        except Exception:
 82            writer.close()
 83
 84    async def _client_loop(self, reader, writer):
 85        try:
 86            while True:
 87                size = await reader.readuntil(b' ')
 88                buff = await reader.readexactly(int(size[:-1]))
 89                msg = encoder.msg_from_str(buff.decode())
 90                mlog.debug("received new syslog message")
 91
 92                await aio.call(self._msg_cb, msg)
 93
 94        except asyncio.IncompleteReadError:
 95            pass
 96
 97        except Exception as e:
 98            mlog.error('tcp client error: %s', e, exc_info=e)
 99
100        finally:
101            with contextlib.suppress(Exception):
102                writer.close()
103
104            # BUGFIX
105            if isinstance(writer.transport,
106                          asyncio.sslproto._SSLProtocolTransport):
107                # TODO for TLS connection Protocol.connection_lost is never
108                #      called
109                await aio.uncancellable(asyncio.sleep(0.001))
110
111            else:
112                await aio.uncancellable(writer.wait_closed())
113
114            mlog.debug('tcp client connection closed')

TCP syslog server

async_group: hat.aio.group.Group
66    @property
67    def async_group(self) -> aio.Group:
68        """Async group"""
69        return self._async_group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class UdpSyslogServer(hat.aio.group.Resource):
143class UdpSyslogServer(aio.Resource):
144    """UDP syslog server"""
145
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._async_group
150
151    def _on_close(self):
152        with contextlib.suppress(Exception):
153            self._transport.close()
154
155    async def _receive_loop(self):
156        try:
157            while True:
158                try:
159                    msg_bytes = await self._receive_queue.get()
160                    msg = encoder.msg_from_str(msg_bytes.decode())
161                    mlog.debug("received new syslog message")
162
163                    await aio.call(self._msg_cb, msg)
164
165                except Exception as e:
166                    mlog.error('udp client error: %s', e, exc_info=e)
167
168        except Exception as e:
169            mlog.error('receive loop error: %s', e, exc_info=e)
170
171        finally:
172            self.close()
173            self._receive_queue.close()

UDP syslog server

async_group: hat.aio.group.Group
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._async_group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close