hat.syslog.server.backend
Backend implementation
1"""Backend implementation""" 2 3from collections.abc import Callable 4from pathlib import Path 5import asyncio 6import contextlib 7import itertools 8import logging 9 10from hat import aio 11from hat import util 12 13from hat.syslog.server import common 14from hat.syslog.server import database 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20register_delay: float = 0.1 21"""Registration delay in seconds""" 22 23register_queue_size: int = 50 24"""Registration queue size""" 25 26register_queue_treshold: int = 10 27"""Registration queue threshold""" 28 29 30async def create_backend(path: Path, 31 low_size: int, 32 high_size: int, 33 enable_archive: bool, 34 disable_journal: bool 35 ) -> 'Backend': 36 """Create backend""" 37 db = await database.create_database(path, disable_journal) 38 try: 39 first_id = await db.get_first_id() 40 last_id = await db.get_last_id() 41 except BaseException: 42 await aio.uncancellable(db.async_close()) 43 raise 44 45 backend = Backend() 46 backend._path = path 47 backend._low_size = low_size 48 backend._high_size = high_size 49 backend._enable_archive = enable_archive 50 backend._disable_journal = disable_journal 51 backend._db = db 52 backend._first_id = first_id 53 backend._last_id = last_id 54 backend._async_group = aio.Group() 55 backend._change_cbs = util.CallbackRegistry() 56 backend._msg_queue = aio.Queue(register_queue_size) 57 backend._executor = aio.create_executor() 58 59 backend._async_group.spawn(aio.call_on_cancel, db.async_close) 60 backend._async_group.spawn(backend._loop) 61 62 mlog.debug('created backend with database %s', path) 63 return backend 64 65 66class Backend(aio.Resource): 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._async_group 72 73 @property 74 def first_id(self) -> int | None: 75 """First entry id""" 76 return self._first_id 77 78 @property 79 def last_id(self) -> int | None: 80 """Last entry id""" 81 return self._last_id 82 83 def register_change_cb(self, 84 cb: Callable[[list[common.Entry]], None] 85 ) -> util.RegisterCallbackHandle: 86 """Register change callback 87 88 Callback is called if `first_id` changes and/or `last_id` changes 89 and/or new entries are available (passed as argument to registered 90 callback). 91 92 """ 93 return self._change_cbs.register(cb) 94 95 async def register(self, 96 timestamp: float, 97 msg: common.Msg): 98 """Register message 99 100 Registration adds msg to registration queue. If queue is full, wait 101 until message can be successfully added. 102 103 When message is added to empty queue, registration delay timer is 104 started. Once delay timer expires or if number of messages in queue 105 is greater than threshold, all messages are removed from queue and 106 inserted into sqlite database. 107 108 """ 109 await self._msg_queue.put((timestamp, msg)) 110 111 async def query(self, 112 filter: common.Filter 113 ) -> list[common.Entry]: 114 """Query entries""" 115 return await self._db.query(filter) 116 117 async def _loop(self): 118 try: 119 while True: 120 msgs = await self._get_msgs() 121 await self._process_msgs(msgs) 122 123 except Exception as e: 124 mlog.warn("backend loop error: %s", e, exc_info=e) 125 126 finally: 127 self.close() 128 self._msg_queue.close() 129 mlog.debug('backend loop closed') 130 131 async def _get_msgs(self): 132 loop = asyncio.get_running_loop() 133 msgs = [] 134 135 msg = await self._msg_queue.get() 136 msgs.append(msg) 137 138 start = loop.time() 139 while True: 140 while not self._msg_queue.empty(): 141 msgs.append(self._msg_queue.get_nowait()) 142 timeout = register_delay - (loop.time() - start) 143 if timeout <= 0: 144 break 145 if len(msgs) >= register_queue_treshold: 146 break 147 async_group = aio.Group() 148 try: 149 f = async_group.spawn(self._msg_queue.get) 150 await aio.wait_for(asyncio.shield(f), timeout) 151 except asyncio.TimeoutError: 152 break 153 finally: 154 await aio.uncancellable(async_group.async_close()) 155 if not f.cancelled(): 156 msgs.append(f.result()) 157 158 while not self._msg_queue.empty(): 159 msgs.append(self._msg_queue.get_nowait()) 160 return msgs 161 162 async def _process_msgs(self, msgs): 163 mlog.debug("registering new messages (message count: %s)...", 164 len(msgs)) 165 entries = await self._db.add_msgs(msgs) 166 if not entries: 167 return 168 entries = list(reversed(entries)) 169 170 self._last_id = entries[0].id 171 if self._first_id is None: 172 self._first_id = entries[-1].id 173 174 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 175 self._first_id, self._last_id) 176 self._change_cbs.notify(entries) 177 178 if self._high_size <= 0: 179 return 180 if self._last_id - self._first_id + 1 <= self._high_size: 181 return 182 183 mlog.debug("database cleanup starting...") 184 await self._db_cleanup() 185 186 async def _db_cleanup(self): 187 first_id = self._last_id - self._low_size + 1 188 if first_id > self._last_id: 189 first_id = None 190 if first_id <= self._first_id: 191 return 192 193 if self._enable_archive: 194 mlog.debug("archiving database entries...") 195 await self._archive_db(first_id) 196 197 await self._db.delete(first_id) 198 self._first_id = first_id 199 if self._first_id is None: 200 self._last_id = None 201 202 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 203 self._first_id, self._last_id) 204 self._change_cbs.notify([]) 205 206 async def _archive_db(self, first_id): 207 archive_path = await self._async_group.spawn( 208 self._executor, _ext_get_new_archive_path, self._path) 209 archive = await database.create_database( 210 archive_path, self._disable_journal) 211 try: 212 entries = await self._db.query(common.Filter( 213 last_id=first_id - 1 if first_id is not None else None)) 214 await archive.add_entries(entries) 215 finally: 216 await aio.uncancellable(archive.async_close()) 217 218 219def _ext_get_new_archive_path(db_path): 220 last_index = 0 221 222 for i in db_path.parent.glob(db_path.name + '.*'): 223 with contextlib.suppress(ValueError): 224 index = int(i.name.split('.')[-1]) 225 if index > last_index: 226 last_index = index 227 228 for i in itertools.count(last_index + 1): 229 new_path = db_path.parent / f"{db_path.name}.{i}" 230 if new_path.exists(): 231 continue 232 return new_path
Module logger
register_delay: float =
0.1
Registration delay in seconds
register_queue_size: int =
50
Registration queue size
register_queue_treshold: int =
10
Registration queue threshold
async def
create_backend( path: pathlib.Path, low_size: int, high_size: int, enable_archive: bool, disable_journal: bool) -> Backend:
31async def create_backend(path: Path, 32 low_size: int, 33 high_size: int, 34 enable_archive: bool, 35 disable_journal: bool 36 ) -> 'Backend': 37 """Create backend""" 38 db = await database.create_database(path, disable_journal) 39 try: 40 first_id = await db.get_first_id() 41 last_id = await db.get_last_id() 42 except BaseException: 43 await aio.uncancellable(db.async_close()) 44 raise 45 46 backend = Backend() 47 backend._path = path 48 backend._low_size = low_size 49 backend._high_size = high_size 50 backend._enable_archive = enable_archive 51 backend._disable_journal = disable_journal 52 backend._db = db 53 backend._first_id = first_id 54 backend._last_id = last_id 55 backend._async_group = aio.Group() 56 backend._change_cbs = util.CallbackRegistry() 57 backend._msg_queue = aio.Queue(register_queue_size) 58 backend._executor = aio.create_executor() 59 60 backend._async_group.spawn(aio.call_on_cancel, db.async_close) 61 backend._async_group.spawn(backend._loop) 62 63 mlog.debug('created backend with database %s', path) 64 return backend
Create backend
class
Backend(hat.aio.group.Resource):
67class Backend(aio.Resource): 68 69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._async_group 73 74 @property 75 def first_id(self) -> int | None: 76 """First entry id""" 77 return self._first_id 78 79 @property 80 def last_id(self) -> int | None: 81 """Last entry id""" 82 return self._last_id 83 84 def register_change_cb(self, 85 cb: Callable[[list[common.Entry]], None] 86 ) -> util.RegisterCallbackHandle: 87 """Register change callback 88 89 Callback is called if `first_id` changes and/or `last_id` changes 90 and/or new entries are available (passed as argument to registered 91 callback). 92 93 """ 94 return self._change_cbs.register(cb) 95 96 async def register(self, 97 timestamp: float, 98 msg: common.Msg): 99 """Register message 100 101 Registration adds msg to registration queue. If queue is full, wait 102 until message can be successfully added. 103 104 When message is added to empty queue, registration delay timer is 105 started. Once delay timer expires or if number of messages in queue 106 is greater than threshold, all messages are removed from queue and 107 inserted into sqlite database. 108 109 """ 110 await self._msg_queue.put((timestamp, msg)) 111 112 async def query(self, 113 filter: common.Filter 114 ) -> list[common.Entry]: 115 """Query entries""" 116 return await self._db.query(filter) 117 118 async def _loop(self): 119 try: 120 while True: 121 msgs = await self._get_msgs() 122 await self._process_msgs(msgs) 123 124 except Exception as e: 125 mlog.warn("backend loop error: %s", e, exc_info=e) 126 127 finally: 128 self.close() 129 self._msg_queue.close() 130 mlog.debug('backend loop closed') 131 132 async def _get_msgs(self): 133 loop = asyncio.get_running_loop() 134 msgs = [] 135 136 msg = await self._msg_queue.get() 137 msgs.append(msg) 138 139 start = loop.time() 140 while True: 141 while not self._msg_queue.empty(): 142 msgs.append(self._msg_queue.get_nowait()) 143 timeout = register_delay - (loop.time() - start) 144 if timeout <= 0: 145 break 146 if len(msgs) >= register_queue_treshold: 147 break 148 async_group = aio.Group() 149 try: 150 f = async_group.spawn(self._msg_queue.get) 151 await aio.wait_for(asyncio.shield(f), timeout) 152 except asyncio.TimeoutError: 153 break 154 finally: 155 await aio.uncancellable(async_group.async_close()) 156 if not f.cancelled(): 157 msgs.append(f.result()) 158 159 while not self._msg_queue.empty(): 160 msgs.append(self._msg_queue.get_nowait()) 161 return msgs 162 163 async def _process_msgs(self, msgs): 164 mlog.debug("registering new messages (message count: %s)...", 165 len(msgs)) 166 entries = await self._db.add_msgs(msgs) 167 if not entries: 168 return 169 entries = list(reversed(entries)) 170 171 self._last_id = entries[0].id 172 if self._first_id is None: 173 self._first_id = entries[-1].id 174 175 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 176 self._first_id, self._last_id) 177 self._change_cbs.notify(entries) 178 179 if self._high_size <= 0: 180 return 181 if self._last_id - self._first_id + 1 <= self._high_size: 182 return 183 184 mlog.debug("database cleanup starting...") 185 await self._db_cleanup() 186 187 async def _db_cleanup(self): 188 first_id = self._last_id - self._low_size + 1 189 if first_id > self._last_id: 190 first_id = None 191 if first_id <= self._first_id: 192 return 193 194 if self._enable_archive: 195 mlog.debug("archiving database entries...") 196 await self._archive_db(first_id) 197 198 await self._db.delete(first_id) 199 self._first_id = first_id 200 if self._first_id is None: 201 self._last_id = None 202 203 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 204 self._first_id, self._last_id) 205 self._change_cbs.notify([]) 206 207 async def _archive_db(self, first_id): 208 archive_path = await self._async_group.spawn( 209 self._executor, _ext_get_new_archive_path, self._path) 210 archive = await database.create_database( 211 archive_path, self._disable_journal) 212 try: 213 entries = await self._db.query(common.Filter( 214 last_id=first_id - 1 if first_id is not None else None)) 215 await archive.add_entries(entries) 216 finally: 217 await aio.uncancellable(archive.async_close())
Resource with lifetime control based on Group
.
async_group: hat.aio.group.Group
69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._async_group
Async group
def
register_change_cb( self, cb: Callable[[list[hat.syslog.server.common.Entry]], None]) -> hat.util.callback.RegisterCallbackHandle:
84 def register_change_cb(self, 85 cb: Callable[[list[common.Entry]], None] 86 ) -> util.RegisterCallbackHandle: 87 """Register change callback 88 89 Callback is called if `first_id` changes and/or `last_id` changes 90 and/or new entries are available (passed as argument to registered 91 callback). 92 93 """ 94 return self._change_cbs.register(cb)
96 async def register(self, 97 timestamp: float, 98 msg: common.Msg): 99 """Register message 100 101 Registration adds msg to registration queue. If queue is full, wait 102 until message can be successfully added. 103 104 When message is added to empty queue, registration delay timer is 105 started. Once delay timer expires or if number of messages in queue 106 is greater than threshold, all messages are removed from queue and 107 inserted into sqlite database. 108 109 """ 110 await self._msg_queue.put((timestamp, msg))
Register message
Registration adds msg to registration queue. If queue is full, wait until message can be successfully added.
When message is added to empty queue, registration delay timer is started. Once delay timer expires or if number of messages in queue is greater than threshold, all messages are removed from queue and inserted into sqlite database.
async def
query( self, filter: hat.syslog.server.common.Filter) -> list[hat.syslog.server.common.Entry]:
112 async def query(self, 113 filter: common.Filter 114 ) -> list[common.Entry]: 115 """Query entries""" 116 return await self._db.query(filter)
Query entries