hat.syslog.server.syslog

Syslog server implementation

 1"""Syslog server implementation"""
 2
 3from pathlib import Path
 4import asyncio.sslproto
 5import contextlib
 6import datetime
 7import functools
 8import logging
 9import ssl
10import typing
11import urllib.parse
12
13from hat import aio
14from hat.syslog.server import encoder
15import hat.syslog.server.backend
16
17
18mlog: logging.Logger = logging.getLogger(__name__)
19"""Module logger"""
20
21
22async def create_syslog_server(addr: str,
23                               pem: typing.Optional[Path],
24                               backend: hat.syslog.server.backend.Backend
25                               ) -> 'SysLogServer':
26    """Create syslog server"""
27    addr = urllib.parse.urlparse(addr)
28    if addr.scheme == 'ssl':
29        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
30        ssl_ctx.load_cert_chain(pem)
31    else:
32        ssl_ctx = None
33
34    async_group = aio.Group()
35    srv = await asyncio.start_server(
36        functools.partial(async_group.spawn, _client_loop, backend),
37        addr.hostname, addr.port, ssl=ssl_ctx)
38    async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv)
39
40    mlog.debug('listening for syslog clients on %s:%s',
41               addr.hostname, addr.port)
42
43    srv = SysLogServer()
44    srv._async_group = async_group
45    return srv
46
47
48class SysLogServer(aio.Resource):
49    """Syslog server
50
51    For creating new instance see :func:`create_syslog_server`.
52
53    """
54
55    @property
56    def async_group(self) -> aio.Group:
57        """Async group"""
58        return self._async_group
59
60
61async def _client_loop(backend, reader, writer):
62    try:
63        while True:
64            size = await reader.readuntil(b' ')
65            buff = await reader.readexactly(int(size[:-1]))
66            t = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
67            msg = encoder.msg_from_str(buff.decode())
68            mlog.debug("received new syslog message")
69            await backend.register(t, msg)
70    except asyncio.IncompleteReadError:
71        pass
72    except Exception as e:
73        mlog.warning('syslog client error: %s', e, exc_info=e)
74    finally:
75        # BUGFIX
76        if isinstance(writer.transport,
77                      asyncio.sslproto._SSLProtocolTransport):
78            # TODO for SSL connection Protocol.connection_lost is never called
79            writer.close()
80            await aio.uncancellable(asyncio.sleep(0.001))
81        else:
82            await aio.uncancellable(_asyncio_async_close(writer))
83        mlog.debug('syslog client connection closed')
84
85
86async def _asyncio_async_close(x):
87    with contextlib.suppress(Exception):
88        x.close()
89    await x.wait_closed()
mlog: logging.Logger = <Logger hat.syslog.server.syslog (WARNING)>

Module logger

async def create_syslog_server( addr: str, pem: Optional[pathlib.Path], backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.syslog.SysLogServer:
23async def create_syslog_server(addr: str,
24                               pem: typing.Optional[Path],
25                               backend: hat.syslog.server.backend.Backend
26                               ) -> 'SysLogServer':
27    """Create syslog server"""
28    addr = urllib.parse.urlparse(addr)
29    if addr.scheme == 'ssl':
30        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
31        ssl_ctx.load_cert_chain(pem)
32    else:
33        ssl_ctx = None
34
35    async_group = aio.Group()
36    srv = await asyncio.start_server(
37        functools.partial(async_group.spawn, _client_loop, backend),
38        addr.hostname, addr.port, ssl=ssl_ctx)
39    async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv)
40
41    mlog.debug('listening for syslog clients on %s:%s',
42               addr.hostname, addr.port)
43
44    srv = SysLogServer()
45    srv._async_group = async_group
46    return srv

Create syslog server

class SysLogServer(hat.aio.Resource):
49class SysLogServer(aio.Resource):
50    """Syslog server
51
52    For creating new instance see :func:`create_syslog_server`.
53
54    """
55
56    @property
57    def async_group(self) -> aio.Group:
58        """Async group"""
59        return self._async_group

Syslog server

For creating new instance see create_syslog_server.

SysLogServer()
async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close