hat.syslog.server.ui

Web server implementation

  1"""Web server implementation"""
  2
  3from pathlib import Path
  4import functools
  5import logging
  6import typing
  7import urllib
  8
  9from hat import aio
 10from hat import juggler
 11
 12from hat.syslog.server import common
 13from hat.syslog.server import encoder
 14import hat.syslog.server.backend
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20package_path: Path = Path(__file__).parent
 21"""Python package path"""
 22
 23ui_path: Path = package_path / 'ui'
 24"""UI directory path"""
 25
 26max_results_limit: int = 200
 27"""Max results limit"""
 28
 29autoflush_delay: float = 0.2
 30"""Juggler autoflush delay"""
 31
 32
 33async def create_web_server(addr: str,
 34                            pem: typing.Optional[Path],
 35                            backend: hat.syslog.server.backend.Backend
 36                            ) -> 'WebServer':
 37    """Create web server"""
 38    addr = urllib.parse.urlparse(addr)
 39    host = addr.hostname
 40    port = addr.port
 41
 42    server = WebServer()
 43    server._backend = backend
 44    server._srv = await juggler.listen(host, port, server._on_connection,
 45                                       static_dir=ui_path,
 46                                       pem_file=pem,
 47                                       autoflush_delay=autoflush_delay)
 48    return server
 49
 50
 51class WebServer(aio.Resource):
 52
 53    @property
 54    def async_group(self) -> aio.Group:
 55        """Async group"""
 56        return self._srv.async_group
 57
 58    def _on_connection(self, conn):
 59        self.async_group.spawn(self._connection_loop, conn)
 60
 61    async def _connection_loop(self, conn):
 62        change_queue = aio.Queue()
 63        conn.async_group.spawn(_change_loop, self._backend, conn, change_queue)
 64
 65        try:
 66            with self._backend.register_change_cb(change_queue.put_nowait):
 67                with conn.register_change_cb(
 68                        functools.partial(change_queue.put_nowait, [])):
 69                    await conn.wait_closing()
 70
 71        finally:
 72            conn.close()
 73
 74
 75async def _change_loop(backend, conn, change_queue):
 76    try:
 77        filter_json = _sanitize_filter(conn.remote_data)
 78        first_id = backend.first_id
 79        last_id = backend.last_id
 80        filter_changed = True
 81        new_entries_json = []
 82
 83        while True:
 84            if filter_changed:
 85                filter = encoder.filter_from_json(filter_json)
 86                while not change_queue.empty():
 87                    change_queue.get_nowait()
 88                entries = await backend.query(filter)
 89                entries_json = [encoder.entry_to_json(entry)
 90                                for entry in entries]
 91            elif new_entries_json:
 92                previous_id = entries_json[0]['id'] if entries_json else 0
 93                entries_json = [*_filter_entries(filter_json, previous_id,
 94                                                 new_entries_json),
 95                                *entries_json]
 96                entries_json = entries_json[:filter.max_results]
 97
 98            conn.set_local_data({'filter': filter_json,
 99                                 'entries': entries_json,
100                                 'first_id': first_id,
101                                 'last_id': last_id})
102
103            new_entries = await change_queue.get()
104            new_entries_json = [encoder.entry_to_json(entry)
105                                for entry in new_entries]
106
107            first_id = backend.first_id
108            last_id = backend.last_id
109            new_filter_json = _sanitize_filter(conn.remote_data)
110            filter_changed = new_filter_json != filter_json
111            filter_json = new_filter_json
112
113    finally:
114        conn.close()
115
116
117def _sanitize_filter(filter_json):
118    if not filter_json:
119        filter_json = encoder.filter_to_json(
120            common.Filter(max_results=max_results_limit))
121    if (filter_json['max_results'] is None or
122            filter_json['max_results'] > max_results_limit):
123        filter_json = dict(filter_json, max_results=max_results_limit)
124    return filter_json
125
126
127def _filter_entries(filter_json, previous_id, entries_json):
128    for i in entries_json:
129        if i['id'] <= previous_id:
130            continue
131        if (filter_json['last_id'] is not None
132                and i['id'] > filter_json['last_id']):
133            continue
134        if (filter_json['entry_timestamp_from'] is not None
135                and i['timestamp'] < filter_json['entry_timestamp_from']):
136            continue
137        if (filter_json['entry_timestamp_to'] is not None
138                and i['timestamp'] > filter_json['entry_timestamp_to']):
139            continue
140        if (filter_json['facility'] is not None and
141                i['msg']['facility'] != filter_json['facility']):
142            continue
143        if (filter_json['severity'] is not None and
144                i['msg']['severity'] != filter_json['severity']):
145            continue
146        if not _match_str_filter(filter_json['hostname'],
147                                 i['msg']['hostname']):
148            continue
149        if not _match_str_filter(filter_json['app_name'],
150                                 i['msg']['app_name']):
151            continue
152        if not _match_str_filter(filter_json['procid'],
153                                 i['msg']['procid']):
154            continue
155        if not _match_str_filter(filter_json['msgid'],
156                                 i['msg']['msgid']):
157            continue
158        if not _match_str_filter(filter_json['msg'],
159                                 i['msg']['msg']):
160            continue
161        yield i
162
163
164def _match_str_filter(filter, value):
165    return not filter or filter in value
mlog: logging.Logger = <Logger hat.syslog.server.ui (WARNING)>

Module logger

package_path: pathlib.Path = PosixPath('/home/runner/work/hat-syslog/hat-syslog/src_py/hat/syslog/server')

Python package path

ui_path: pathlib.Path = PosixPath('/home/runner/work/hat-syslog/hat-syslog/src_py/hat/syslog/server/ui')

UI directory path

max_results_limit: int = 200

Max results limit

autoflush_delay: float = 0.2

Juggler autoflush delay

async def create_web_server( addr: str, pem: Optional[pathlib.Path], backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.ui.WebServer:
34async def create_web_server(addr: str,
35                            pem: typing.Optional[Path],
36                            backend: hat.syslog.server.backend.Backend
37                            ) -> 'WebServer':
38    """Create web server"""
39    addr = urllib.parse.urlparse(addr)
40    host = addr.hostname
41    port = addr.port
42
43    server = WebServer()
44    server._backend = backend
45    server._srv = await juggler.listen(host, port, server._on_connection,
46                                       static_dir=ui_path,
47                                       pem_file=pem,
48                                       autoflush_delay=autoflush_delay)
49    return server

Create web server

class WebServer(hat.aio.Resource):
52class WebServer(aio.Resource):
53
54    @property
55    def async_group(self) -> aio.Group:
56        """Async group"""
57        return self._srv.async_group
58
59    def _on_connection(self, conn):
60        self.async_group.spawn(self._connection_loop, conn)
61
62    async def _connection_loop(self, conn):
63        change_queue = aio.Queue()
64        conn.async_group.spawn(_change_loop, self._backend, conn, change_queue)
65
66        try:
67            with self._backend.register_change_cb(change_queue.put_nowait):
68                with conn.register_change_cb(
69                        functools.partial(change_queue.put_nowait, [])):
70                    await conn.wait_closing()
71
72        finally:
73            conn.close()

Resource with lifetime control based on Group.

WebServer()
async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close