hat.syslog.server.ui
Web server implementation
1"""Web server implementation""" 2 3import asyncio 4import contextlib 5import importlib 6import itertools 7import logging 8import urllib 9 10import aiohttp.web 11 12from hat import aio 13from hat import json 14from hat import juggler 15 16from hat.syslog.server import common 17from hat.syslog.server import encoder 18import hat.syslog.server.backend 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22"""Module logger""" 23 24max_results_limit: int = 200 25"""Max results limit""" 26 27autoflush_delay: float = 0.2 28"""Juggler autoflush delay""" 29 30default_filter = common.Filter(max_results=max_results_limit) 31"""Default filter""" 32 33 34async def create_web_server(addr: str, 35 backend: hat.syslog.server.backend.Backend 36 ) -> 'WebServer': 37 """Create web server""" 38 srv = WebServer() 39 srv._backend = backend 40 srv._locks = {} 41 srv._filters = {} 42 43 exit_stack = contextlib.ExitStack() 44 try: 45 ui_path = exit_stack.enter_context( 46 importlib.resources.as_file( 47 importlib.resources.files(__package__) / 'ui')) 48 49 additional_routes = [aiohttp.web.get('/backup', srv._backup_handler)] 50 51 url = urllib.parse.urlparse(addr) 52 srv._srv = await juggler.listen(host=url.hostname, 53 port=url.port, 54 connection_cb=srv._on_connection, 55 request_cb=srv._on_request, 56 static_dir=ui_path, 57 autoflush_delay=autoflush_delay, 58 additional_routes=additional_routes) 59 60 try: 61 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 62 63 except BaseException: 64 await aio.uncancellable(srv.async_close()) 65 raise 66 67 except BaseException: 68 exit_stack.close() 69 raise 70 71 mlog.debug("web server listening on %s", addr) 72 return srv 73 74 75class WebServer(aio.Resource): 76 77 @property 78 def async_group(self) -> aio.Group: 79 """Async group""" 80 return self._srv.async_group 81 82 async def _on_connection(self, conn): 83 try: 84 mlog.debug("new connection") 85 86 self._locks[conn] = asyncio.Lock() 87 self._filters[conn] = default_filter 88 89 change_queue = aio.Queue() 90 with self._backend.register_change_cb(change_queue.put_nowait): 91 async with self._locks[conn]: 92 prev_filter = self._filters[conn] 93 prev_filter_json = encoder.filter_to_json(prev_filter) 94 95 entries = await self._backend.query(prev_filter) 96 entries_json = [encoder.entry_to_json(entry) 97 for entry in entries] 98 99 conn.state.set([], {'filter': prev_filter_json, 100 'entries': entries_json, 101 'first_id': self._backend.first_id, 102 'last_id': self._backend.last_id}) 103 104 while True: 105 entries = await change_queue.get() 106 107 async with self._locks[conn]: 108 prev_filter = self._filters[conn] 109 prev_filter_json = conn.state.get('filter') 110 prev_entries_json = conn.state.get('entries') 111 112 previous_id = (prev_entries_json[0]['id'] 113 if prev_entries_json else 0) 114 entries = (entry for entry in entries 115 if entry.id > previous_id) 116 entries = _filter_entries(prev_filter, entries) 117 entries_json = [encoder.entry_to_json(entry) 118 for entry in entries] 119 120 if entries_json: 121 new_entries_json = itertools.chain( 122 entries_json, prev_entries_json) 123 new_entries_json = itertools.islice( 124 new_entries_json, prev_filter.max_results) 125 new_entries_json = list(new_entries_json) 126 127 else: 128 new_entries_json = prev_entries_json 129 130 conn.state.set([], {'filter': prev_filter_json, 131 'entries': new_entries_json, 132 'first_id': self._backend.first_id, 133 'last_id': self._backend.last_id}) 134 135 except Exception as e: 136 mlog.error("connection error: %s", e, exc_info=e) 137 138 finally: 139 mlog.debug("closing connection") 140 conn.close() 141 self._locks.pop(conn) 142 self._filters.pop(conn) 143 144 async def _on_request(self, conn, name, data): 145 if name != 'filter': 146 raise Exception('invalid request name') 147 148 new_filter = encoder.filter_from_json(data) 149 new_filter = _sanitize_filter(new_filter) 150 151 async with self._locks[conn]: 152 prev_filter = self._filters[conn] 153 if new_filter == prev_filter: 154 return 155 156 mlog.debug('setting new filter: %s', new_filter) 157 new_filter_json = encoder.filter_to_json(new_filter) 158 159 entries = await self._backend.query(new_filter) 160 entries_json = [encoder.entry_to_json(entry) for entry in entries] 161 162 self._filters[conn] = new_filter 163 conn.state.set([], {'filter': new_filter_json, 164 'entries': entries_json, 165 'first_id': self._backend.first_id, 166 'last_id': self._backend.last_id}) 167 168 async def _backup_handler(self, request): 169 response = aiohttp.web.StreamResponse() 170 response.content_type = 'application/octet-stream' 171 await response.prepare(request) 172 173 max_results = 256 174 last_id = None 175 176 while True: 177 entries = await self._backend.query( 178 common.Filter(max_results=max_results, 179 last_id=last_id)) 180 if not entries: 181 break 182 183 for entry in entries: 184 entry_json = encoder.entry_to_json(entry) 185 entry_str = json.encode(entry_json) 186 entry_bytes = entry_str.encode('utf-8') 187 188 await response.write(entry_bytes + b'\n') 189 190 last_id = entry.id - 1 191 await asyncio.sleep(0) 192 193 await response.write_eof() 194 195 return response 196 197 198def _sanitize_filter(f): 199 if f.max_results is None or f.max_results > max_results_limit: 200 f = f._replace(max_results=max_results_limit) 201 202 return f 203 204 205def _filter_entries(f, entries): 206 for i in entries: 207 if f.last_id is not None and i.id > f.last_id: 208 continue 209 210 if (f.entry_timestamp_from is not None 211 and i.timestamp < f.entry_timestamp_from): 212 continue 213 214 if (f.entry_timestamp_to is not None 215 and i.timestamp > f.entry_timestamp_to): 216 continue 217 218 if f.facility is not None and i.msg.facility != f.facility: 219 continue 220 221 if f.severity is not None and i.msg.severity != f.severity: 222 continue 223 224 if not _match_str_filter(f.hostname, i.msg.hostname): 225 continue 226 227 if not _match_str_filter(f.app_name, i.msg.app_name): 228 continue 229 230 if not _match_str_filter(f.procid, i.msg.procid): 231 continue 232 233 if not _match_str_filter(f.msgid, i.msg.msgid): 234 continue 235 236 if not _match_str_filter(f.msg, i.msg.msg): 237 continue 238 239 yield i 240 241 242def _match_str_filter(f, value): 243 return not f or f in value
Module logger
max_results_limit: int =
200
Max results limit
autoflush_delay: float =
0.2
Juggler autoflush delay
default_filter =
Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)
Default filter
35async def create_web_server(addr: str, 36 backend: hat.syslog.server.backend.Backend 37 ) -> 'WebServer': 38 """Create web server""" 39 srv = WebServer() 40 srv._backend = backend 41 srv._locks = {} 42 srv._filters = {} 43 44 exit_stack = contextlib.ExitStack() 45 try: 46 ui_path = exit_stack.enter_context( 47 importlib.resources.as_file( 48 importlib.resources.files(__package__) / 'ui')) 49 50 additional_routes = [aiohttp.web.get('/backup', srv._backup_handler)] 51 52 url = urllib.parse.urlparse(addr) 53 srv._srv = await juggler.listen(host=url.hostname, 54 port=url.port, 55 connection_cb=srv._on_connection, 56 request_cb=srv._on_request, 57 static_dir=ui_path, 58 autoflush_delay=autoflush_delay, 59 additional_routes=additional_routes) 60 61 try: 62 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 63 64 except BaseException: 65 await aio.uncancellable(srv.async_close()) 66 raise 67 68 except BaseException: 69 exit_stack.close() 70 raise 71 72 mlog.debug("web server listening on %s", addr) 73 return srv
Create web server
class
WebServer(hat.aio.group.Resource):
76class WebServer(aio.Resource): 77 78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group 82 83 async def _on_connection(self, conn): 84 try: 85 mlog.debug("new connection") 86 87 self._locks[conn] = asyncio.Lock() 88 self._filters[conn] = default_filter 89 90 change_queue = aio.Queue() 91 with self._backend.register_change_cb(change_queue.put_nowait): 92 async with self._locks[conn]: 93 prev_filter = self._filters[conn] 94 prev_filter_json = encoder.filter_to_json(prev_filter) 95 96 entries = await self._backend.query(prev_filter) 97 entries_json = [encoder.entry_to_json(entry) 98 for entry in entries] 99 100 conn.state.set([], {'filter': prev_filter_json, 101 'entries': entries_json, 102 'first_id': self._backend.first_id, 103 'last_id': self._backend.last_id}) 104 105 while True: 106 entries = await change_queue.get() 107 108 async with self._locks[conn]: 109 prev_filter = self._filters[conn] 110 prev_filter_json = conn.state.get('filter') 111 prev_entries_json = conn.state.get('entries') 112 113 previous_id = (prev_entries_json[0]['id'] 114 if prev_entries_json else 0) 115 entries = (entry for entry in entries 116 if entry.id > previous_id) 117 entries = _filter_entries(prev_filter, entries) 118 entries_json = [encoder.entry_to_json(entry) 119 for entry in entries] 120 121 if entries_json: 122 new_entries_json = itertools.chain( 123 entries_json, prev_entries_json) 124 new_entries_json = itertools.islice( 125 new_entries_json, prev_filter.max_results) 126 new_entries_json = list(new_entries_json) 127 128 else: 129 new_entries_json = prev_entries_json 130 131 conn.state.set([], {'filter': prev_filter_json, 132 'entries': new_entries_json, 133 'first_id': self._backend.first_id, 134 'last_id': self._backend.last_id}) 135 136 except Exception as e: 137 mlog.error("connection error: %s", e, exc_info=e) 138 139 finally: 140 mlog.debug("closing connection") 141 conn.close() 142 self._locks.pop(conn) 143 self._filters.pop(conn) 144 145 async def _on_request(self, conn, name, data): 146 if name != 'filter': 147 raise Exception('invalid request name') 148 149 new_filter = encoder.filter_from_json(data) 150 new_filter = _sanitize_filter(new_filter) 151 152 async with self._locks[conn]: 153 prev_filter = self._filters[conn] 154 if new_filter == prev_filter: 155 return 156 157 mlog.debug('setting new filter: %s', new_filter) 158 new_filter_json = encoder.filter_to_json(new_filter) 159 160 entries = await self._backend.query(new_filter) 161 entries_json = [encoder.entry_to_json(entry) for entry in entries] 162 163 self._filters[conn] = new_filter 164 conn.state.set([], {'filter': new_filter_json, 165 'entries': entries_json, 166 'first_id': self._backend.first_id, 167 'last_id': self._backend.last_id}) 168 169 async def _backup_handler(self, request): 170 response = aiohttp.web.StreamResponse() 171 response.content_type = 'application/octet-stream' 172 await response.prepare(request) 173 174 max_results = 256 175 last_id = None 176 177 while True: 178 entries = await self._backend.query( 179 common.Filter(max_results=max_results, 180 last_id=last_id)) 181 if not entries: 182 break 183 184 for entry in entries: 185 entry_json = encoder.entry_to_json(entry) 186 entry_str = json.encode(entry_json) 187 entry_bytes = entry_str.encode('utf-8') 188 189 await response.write(entry_bytes + b'\n') 190 191 last_id = entry.id - 1 192 await asyncio.sleep(0) 193 194 await response.write_eof() 195 196 return response
Resource with lifetime control based on Group
.