hat.syslog.server.ui

Web server implementation

  1"""Web server implementation"""
  2
  3import asyncio
  4import contextlib
  5import importlib
  6import itertools
  7import logging
  8import urllib
  9
 10import aiohttp.web
 11
 12from hat import aio
 13from hat import json
 14from hat import juggler
 15
 16from hat.syslog.server import common
 17from hat.syslog.server import encoder
 18import hat.syslog.server.backend
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22"""Module logger"""
 23
 24max_results_limit: int = 200
 25"""Max results limit"""
 26
 27autoflush_delay: float = 0.2
 28"""Juggler autoflush delay"""
 29
 30default_filter = common.Filter(max_results=max_results_limit)
 31"""Default filter"""
 32
 33
 34async def create_web_server(addr: str,
 35                            backend: hat.syslog.server.backend.Backend
 36                            ) -> 'WebServer':
 37    """Create web server"""
 38    srv = WebServer()
 39    srv._backend = backend
 40    srv._locks = {}
 41    srv._filters = {}
 42
 43    exit_stack = contextlib.ExitStack()
 44    try:
 45        ui_path = exit_stack.enter_context(
 46            importlib.resources.as_file(
 47                importlib.resources.files(__package__) / 'ui'))
 48
 49        additional_routes = [aiohttp.web.get('/backup', srv._backup_handler)]
 50
 51        url = urllib.parse.urlparse(addr)
 52        srv._srv = await juggler.listen(host=url.hostname,
 53                                        port=url.port,
 54                                        connection_cb=srv._on_connection,
 55                                        request_cb=srv._on_request,
 56                                        static_dir=ui_path,
 57                                        autoflush_delay=autoflush_delay,
 58                                        additional_routes=additional_routes)
 59
 60        try:
 61            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
 62
 63        except BaseException:
 64            await aio.uncancellable(srv.async_close())
 65            raise
 66
 67    except BaseException:
 68        exit_stack.close()
 69        raise
 70
 71    mlog.debug("web server listening on %s", addr)
 72    return srv
 73
 74
 75class WebServer(aio.Resource):
 76
 77    @property
 78    def async_group(self) -> aio.Group:
 79        """Async group"""
 80        return self._srv.async_group
 81
 82    async def _on_connection(self, conn):
 83        try:
 84            mlog.debug("new connection")
 85
 86            self._locks[conn] = asyncio.Lock()
 87            self._filters[conn] = default_filter
 88
 89            change_queue = aio.Queue()
 90            with self._backend.register_change_cb(change_queue.put_nowait):
 91                async with self._locks[conn]:
 92                    prev_filter = self._filters[conn]
 93                    prev_filter_json = encoder.filter_to_json(prev_filter)
 94
 95                    entries = await self._backend.query(prev_filter)
 96                    entries_json = [encoder.entry_to_json(entry)
 97                                    for entry in entries]
 98
 99                    conn.state.set([], {'filter': prev_filter_json,
100                                        'entries': entries_json,
101                                        'first_id': self._backend.first_id,
102                                        'last_id': self._backend.last_id})
103
104                while True:
105                    entries = await change_queue.get()
106
107                    async with self._locks[conn]:
108                        prev_filter = self._filters[conn]
109                        prev_filter_json = conn.state.get('filter')
110                        prev_entries_json = conn.state.get('entries')
111
112                        previous_id = (prev_entries_json[0]['id']
113                                       if prev_entries_json else 0)
114                        entries = (entry for entry in entries
115                                   if entry.id > previous_id)
116                        entries = _filter_entries(prev_filter, entries)
117                        entries_json = [encoder.entry_to_json(entry)
118                                        for entry in entries]
119
120                        if entries_json:
121                            new_entries_json = itertools.chain(
122                                entries_json, prev_entries_json)
123                            new_entries_json = itertools.islice(
124                                new_entries_json, prev_filter.max_results)
125                            new_entries_json = list(new_entries_json)
126
127                        else:
128                            new_entries_json = prev_entries_json
129
130                        conn.state.set([], {'filter': prev_filter_json,
131                                            'entries': new_entries_json,
132                                            'first_id': self._backend.first_id,
133                                            'last_id': self._backend.last_id})
134
135        except Exception as e:
136            mlog.error("connection error: %s", e, exc_info=e)
137
138        finally:
139            mlog.debug("closing connection")
140            conn.close()
141            self._locks.pop(conn)
142            self._filters.pop(conn)
143
144    async def _on_request(self, conn, name, data):
145        if name != 'filter':
146            raise Exception('invalid request name')
147
148        new_filter = encoder.filter_from_json(data)
149        new_filter = _sanitize_filter(new_filter)
150
151        async with self._locks[conn]:
152            prev_filter = self._filters[conn]
153            if new_filter == prev_filter:
154                return
155
156            mlog.debug('setting new filter: %s', new_filter)
157            new_filter_json = encoder.filter_to_json(new_filter)
158
159            entries = await self._backend.query(new_filter)
160            entries_json = [encoder.entry_to_json(entry) for entry in entries]
161
162            self._filters[conn] = new_filter
163            conn.state.set([], {'filter': new_filter_json,
164                                'entries': entries_json,
165                                'first_id': self._backend.first_id,
166                                'last_id': self._backend.last_id})
167
168    async def _backup_handler(self, request):
169        response = aiohttp.web.StreamResponse()
170        response.content_type = 'application/octet-stream'
171        await response.prepare(request)
172
173        max_results = 256
174        last_id = None
175
176        while True:
177            entries = await self._backend.query(
178                common.Filter(max_results=max_results,
179                              last_id=last_id))
180            if not entries:
181                break
182
183            for entry in entries:
184                entry_json = encoder.entry_to_json(entry)
185                entry_str = json.encode(entry_json)
186                entry_bytes = entry_str.encode('utf-8')
187
188                await response.write(entry_bytes + b'\n')
189
190                last_id = entry.id - 1
191                await asyncio.sleep(0)
192
193        await response.write_eof()
194
195        return response
196
197
198def _sanitize_filter(f):
199    if f.max_results is None or f.max_results > max_results_limit:
200        f = f._replace(max_results=max_results_limit)
201
202    return f
203
204
205def _filter_entries(f, entries):
206    for i in entries:
207        if f.last_id is not None and i.id > f.last_id:
208            continue
209
210        if (f.entry_timestamp_from is not None
211                and i.timestamp < f.entry_timestamp_from):
212            continue
213
214        if (f.entry_timestamp_to is not None
215                and i.timestamp > f.entry_timestamp_to):
216            continue
217
218        if f.facility is not None and i.msg.facility != f.facility:
219            continue
220
221        if f.severity is not None and i.msg.severity != f.severity:
222            continue
223
224        if not _match_str_filter(f.hostname, i.msg.hostname):
225            continue
226
227        if not _match_str_filter(f.app_name, i.msg.app_name):
228            continue
229
230        if not _match_str_filter(f.procid, i.msg.procid):
231            continue
232
233        if not _match_str_filter(f.msgid, i.msg.msgid):
234            continue
235
236        if not _match_str_filter(f.msg, i.msg.msg):
237            continue
238
239        yield i
240
241
242def _match_str_filter(f, value):
243    return not f or f in value
mlog: logging.Logger = <Logger hat.syslog.server.ui (WARNING)>

Module logger

max_results_limit: int = 200

Max results limit

autoflush_delay: float = 0.2

Juggler autoflush delay

default_filter = Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)

Default filter

async def create_web_server( addr: str, backend: hat.syslog.server.backend.Backend) -> WebServer:
35async def create_web_server(addr: str,
36                            backend: hat.syslog.server.backend.Backend
37                            ) -> 'WebServer':
38    """Create web server"""
39    srv = WebServer()
40    srv._backend = backend
41    srv._locks = {}
42    srv._filters = {}
43
44    exit_stack = contextlib.ExitStack()
45    try:
46        ui_path = exit_stack.enter_context(
47            importlib.resources.as_file(
48                importlib.resources.files(__package__) / 'ui'))
49
50        additional_routes = [aiohttp.web.get('/backup', srv._backup_handler)]
51
52        url = urllib.parse.urlparse(addr)
53        srv._srv = await juggler.listen(host=url.hostname,
54                                        port=url.port,
55                                        connection_cb=srv._on_connection,
56                                        request_cb=srv._on_request,
57                                        static_dir=ui_path,
58                                        autoflush_delay=autoflush_delay,
59                                        additional_routes=additional_routes)
60
61        try:
62            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
63
64        except BaseException:
65            await aio.uncancellable(srv.async_close())
66            raise
67
68    except BaseException:
69        exit_stack.close()
70        raise
71
72    mlog.debug("web server listening on %s", addr)
73    return srv

Create web server

class WebServer(hat.aio.group.Resource):
 76class WebServer(aio.Resource):
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        """Async group"""
 81        return self._srv.async_group
 82
 83    async def _on_connection(self, conn):
 84        try:
 85            mlog.debug("new connection")
 86
 87            self._locks[conn] = asyncio.Lock()
 88            self._filters[conn] = default_filter
 89
 90            change_queue = aio.Queue()
 91            with self._backend.register_change_cb(change_queue.put_nowait):
 92                async with self._locks[conn]:
 93                    prev_filter = self._filters[conn]
 94                    prev_filter_json = encoder.filter_to_json(prev_filter)
 95
 96                    entries = await self._backend.query(prev_filter)
 97                    entries_json = [encoder.entry_to_json(entry)
 98                                    for entry in entries]
 99
100                    conn.state.set([], {'filter': prev_filter_json,
101                                        'entries': entries_json,
102                                        'first_id': self._backend.first_id,
103                                        'last_id': self._backend.last_id})
104
105                while True:
106                    entries = await change_queue.get()
107
108                    async with self._locks[conn]:
109                        prev_filter = self._filters[conn]
110                        prev_filter_json = conn.state.get('filter')
111                        prev_entries_json = conn.state.get('entries')
112
113                        previous_id = (prev_entries_json[0]['id']
114                                       if prev_entries_json else 0)
115                        entries = (entry for entry in entries
116                                   if entry.id > previous_id)
117                        entries = _filter_entries(prev_filter, entries)
118                        entries_json = [encoder.entry_to_json(entry)
119                                        for entry in entries]
120
121                        if entries_json:
122                            new_entries_json = itertools.chain(
123                                entries_json, prev_entries_json)
124                            new_entries_json = itertools.islice(
125                                new_entries_json, prev_filter.max_results)
126                            new_entries_json = list(new_entries_json)
127
128                        else:
129                            new_entries_json = prev_entries_json
130
131                        conn.state.set([], {'filter': prev_filter_json,
132                                            'entries': new_entries_json,
133                                            'first_id': self._backend.first_id,
134                                            'last_id': self._backend.last_id})
135
136        except Exception as e:
137            mlog.error("connection error: %s", e, exc_info=e)
138
139        finally:
140            mlog.debug("closing connection")
141            conn.close()
142            self._locks.pop(conn)
143            self._filters.pop(conn)
144
145    async def _on_request(self, conn, name, data):
146        if name != 'filter':
147            raise Exception('invalid request name')
148
149        new_filter = encoder.filter_from_json(data)
150        new_filter = _sanitize_filter(new_filter)
151
152        async with self._locks[conn]:
153            prev_filter = self._filters[conn]
154            if new_filter == prev_filter:
155                return
156
157            mlog.debug('setting new filter: %s', new_filter)
158            new_filter_json = encoder.filter_to_json(new_filter)
159
160            entries = await self._backend.query(new_filter)
161            entries_json = [encoder.entry_to_json(entry) for entry in entries]
162
163            self._filters[conn] = new_filter
164            conn.state.set([], {'filter': new_filter_json,
165                                'entries': entries_json,
166                                'first_id': self._backend.first_id,
167                                'last_id': self._backend.last_id})
168
169    async def _backup_handler(self, request):
170        response = aiohttp.web.StreamResponse()
171        response.content_type = 'application/octet-stream'
172        await response.prepare(request)
173
174        max_results = 256
175        last_id = None
176
177        while True:
178            entries = await self._backend.query(
179                common.Filter(max_results=max_results,
180                              last_id=last_id))
181            if not entries:
182                break
183
184            for entry in entries:
185                entry_json = encoder.entry_to_json(entry)
186                entry_str = json.encode(entry_json)
187                entry_bytes = entry_str.encode('utf-8')
188
189                await response.write(entry_bytes + b'\n')
190
191                last_id = entry.id - 1
192                await asyncio.sleep(0)
193
194        await response.write_eof()
195
196        return response

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group

Async group