hat.syslog.server.database
Interface to SQLite database
1"""Interface to SQLite database""" 2 3from pathlib import Path 4import logging 5import sqlite3 6 7from hat import aio 8 9from hat.syslog.server import common 10 11 12mlog: logging.Logger = logging.getLogger(__name__) 13"""Module logger""" 14 15 16async def create_database(path: Path, 17 disable_journal: bool 18 ) -> 'Database': 19 """Create database""" 20 21 async def close(): 22 await executor(_ext_close, conn) 23 mlog.debug('database %s closed', path) 24 25 executor = aio.create_executor(1) 26 conn = await executor(_ext_connect, path, disable_journal) 27 async_group = aio.Group() 28 async_group.spawn(aio.call_on_cancel, close) 29 30 db = Database() 31 db._path = path 32 db._conn = conn 33 db._async_group = async_group 34 db._executor = executor 35 36 mlog.debug('opened database %s', path) 37 return db 38 39 40class Database(aio.Resource): 41 42 @property 43 def async_group(self) -> aio.Group: 44 """Async group""" 45 return self._async_group 46 47 async def get_first_id(self) -> int | None: 48 """Get first entry id""" 49 return await self._async_group.spawn(self._executor, _ext_fist_id, 50 self._conn) 51 52 async def get_last_id(self) -> int | None: 53 """Get last entry id""" 54 return await self._async_group.spawn(self._executor, _ext_last_id, 55 self._conn) 56 57 async def add_msgs(self, 58 msgs: list[tuple[float, common.Msg]] 59 ) -> list[common.Entry]: 60 """Add timestamped messages""" 61 columns = ['entry_timestamp', 'facility', 'severity', 'version', 62 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 63 'data', 'msg'] 64 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 65 msg.version, msg.timestamp, msg.hostname, msg.app_name, 66 msg.procid, msg.msgid, msg.data, msg.msg) 67 for entry_timestamp, msg in msgs] 68 entry_ids = await self._async_group.spawn( 69 self._executor, _ext_insert, self._conn, columns, values) 70 71 entries = [ 72 common.Entry(id=entry_id, 73 timestamp=entry_timestamp, 74 msg=msg) 75 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 76 77 mlog.debug("messages added to database (message count: %s)", 78 len(entries)) 79 return entries 80 81 async def add_entries(self, entries: list[common.Entry]): 82 """Add entries""" 83 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 84 'version', 'msg_timestamp', 'hostname', 'app_name', 85 'procid', 'msgid', 'data', 'msg'] 86 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 87 entry.msg.severity.value, entry.msg.version, 88 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 89 entry.msg.procid, entry.msg.msgid, entry.msg.data, 90 entry.msg.msg) 91 for entry in entries] 92 entry_ids = await self._async_group.spawn( 93 self._executor, _ext_insert, self._conn, columns, values) 94 mlog.debug("entries added to database (entry count: %s)", 95 len(entry_ids)) 96 97 async def query(self, 98 filter: common.Filter 99 ) -> list[common.Entry]: 100 """Query entries that satisfy filter""" 101 conditions = [] 102 args = {} 103 if filter.last_id is not None: 104 conditions.append('rowid <= :last_id') 105 args['last_id'] = filter.last_id 106 if filter.entry_timestamp_from is not None: 107 conditions.append('entry_timestamp >= :entry_timestamp_from') 108 args['entry_timestamp_from'] = filter.entry_timestamp_from 109 if filter.entry_timestamp_to is not None: 110 conditions.append('entry_timestamp <= :entry_timestamp_to') 111 args['entry_timestamp_to'] = filter.entry_timestamp_to 112 if filter.facility: 113 conditions.append('facility = :facility') 114 args['facility'] = filter.facility.value 115 if filter.severity: 116 conditions.append('severity = :severity') 117 args['severity'] = filter.severity.value 118 if filter.hostname: 119 conditions.append('hostname LIKE :hostname') 120 args['hostname'] = f'%{filter.hostname}%' 121 if filter.app_name: 122 conditions.append('app_name LIKE :app_name') 123 args['app_name'] = f'%{filter.app_name}%' 124 if filter.procid: 125 conditions.append('procid LIKE :procid') 126 args['procid'] = f'%{filter.procid}%' 127 if filter.msgid: 128 conditions.append('msgid LIKE :msgid') 129 args['msgid'] = f'%{filter.msgid}%' 130 if filter.msg: 131 conditions.append('msg LIKE :msg') 132 args['msg'] = f'%{filter.msg}%' 133 134 result = await self._async_group.spawn( 135 self._executor, _ext_query, self._conn, conditions, args, 136 filter.max_results) 137 138 entries = [common.Entry( 139 id=row['rowid'], 140 timestamp=row['entry_timestamp'], 141 msg=common.Msg(facility=common.Facility(row['facility']), 142 severity=common.Severity(row['severity']), 143 version=row['version'], 144 timestamp=row['msg_timestamp'], 145 hostname=row['hostname'], 146 app_name=row['app_name'], 147 procid=row['procid'], 148 msgid=row['msgid'], 149 data=row['data'], 150 msg=row['msg'])) 151 for row in result] 152 153 mlog.debug("query resulted with %s entries", len(entries)) 154 return entries 155 156 async def delete(self, first_id: int): 157 """Delete entries prior to first_id""" 158 entry_count = await self._async_group.spawn( 159 self._executor, _ext_delete, self._conn, first_id) 160 mlog.debug("deleted %s entries", entry_count) 161 162 163_db_columns = [['entry_timestamp', 'REAL'], 164 ['facility', 'INTEGER'], 165 ['severity', 'INTEGER'], 166 ['version', 'INTEGER'], 167 ['msg_timestamp', 'REAL'], 168 ['hostname', 'TEXT'], 169 ['app_name', 'TEXT'], 170 ['procid', 'TEXT'], 171 ['msgid', 'TEXT'], 172 ['data', 'TEXT'], 173 ['msg', 'TEXT']] 174 175_db_query_columns = ['rowid'] + [name for name, _ in _db_columns] 176 177_db_structure = f""" 178 CREATE TABLE IF NOT EXISTS log ( 179 {', '.join(col_name + ' ' + col_type 180 for col_name, col_type in _db_columns)} 181 ); 182 CREATE INDEX IF NOT EXISTS log_entry_timestamp_index ON log ( 183 entry_timestamp DESC) 184 """ 185 186 187def _ext_connect(path, disable_journal): 188 path.parent.mkdir(exist_ok=True, parents=True) 189 conn = sqlite3.connect(f'file:{path}?nolock=1', 190 uri=True, 191 isolation_level=None, 192 detect_types=sqlite3.PARSE_DECLTYPES) 193 try: 194 conn.executescript( 195 ('PRAGMA journal_mode = OFF;\n' if disable_journal else '') + 196 _db_structure) 197 except Exception: 198 conn.close() 199 raise 200 return conn 201 202 203def _ext_close(conn): 204 conn.close() 205 206 207def _ext_fist_id(conn): 208 c = conn.execute("SELECT MIN(rowid) FROM log") 209 result = c.fetchall() 210 return result[0][0] if result else None 211 212 213def _ext_last_id(conn): 214 c = conn.execute("SELECT MAX(rowid) FROM log") 215 result = c.fetchall() 216 return result[0][0] if result else None 217 218 219def _ext_delete(conn, first_id): 220 cmd = "DELETE FROM log" 221 if first_id is not None: 222 cmd += " WHERE rowid < :first_id" 223 c = conn.execute(cmd, {'first_id': first_id}) 224 return c.rowcount 225 226 227def _ext_insert(conn, columns, values): 228 c = conn.executemany(f"INSERT INTO log ({', '.join(columns)}) " 229 f"VALUES ({', '.join('?' * len(columns))})", values) 230 rowcount = c.rowcount 231 if rowcount < 1: 232 return [] 233 last_id = _ext_last_id(conn) 234 return range(last_id - rowcount + 1, last_id + 1) 235 236 237def _ext_query(conn, conditions, args, max_results): 238 c = conn.execute( 239 ' '.join([ 240 "SELECT rowid, *", 241 "FROM log", 242 ('WHERE ' + ' AND '.join(conditions) if conditions else ''), 243 "ORDER BY rowid DESC", 244 ("LIMIT :max_results" if max_results is not None else '')]), 245 dict(args, max_results=max_results)) 246 result = c.fetchall() 247 return [{k: v for k, v in zip(_db_query_columns, i)} 248 for i in result]
Module logger
17async def create_database(path: Path, 18 disable_journal: bool 19 ) -> 'Database': 20 """Create database""" 21 22 async def close(): 23 await executor(_ext_close, conn) 24 mlog.debug('database %s closed', path) 25 26 executor = aio.create_executor(1) 27 conn = await executor(_ext_connect, path, disable_journal) 28 async_group = aio.Group() 29 async_group.spawn(aio.call_on_cancel, close) 30 31 db = Database() 32 db._path = path 33 db._conn = conn 34 db._async_group = async_group 35 db._executor = executor 36 37 mlog.debug('opened database %s', path) 38 return db
Create database
class
Database(hat.aio.group.Resource):
41class Database(aio.Resource): 42 43 @property 44 def async_group(self) -> aio.Group: 45 """Async group""" 46 return self._async_group 47 48 async def get_first_id(self) -> int | None: 49 """Get first entry id""" 50 return await self._async_group.spawn(self._executor, _ext_fist_id, 51 self._conn) 52 53 async def get_last_id(self) -> int | None: 54 """Get last entry id""" 55 return await self._async_group.spawn(self._executor, _ext_last_id, 56 self._conn) 57 58 async def add_msgs(self, 59 msgs: list[tuple[float, common.Msg]] 60 ) -> list[common.Entry]: 61 """Add timestamped messages""" 62 columns = ['entry_timestamp', 'facility', 'severity', 'version', 63 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 64 'data', 'msg'] 65 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 66 msg.version, msg.timestamp, msg.hostname, msg.app_name, 67 msg.procid, msg.msgid, msg.data, msg.msg) 68 for entry_timestamp, msg in msgs] 69 entry_ids = await self._async_group.spawn( 70 self._executor, _ext_insert, self._conn, columns, values) 71 72 entries = [ 73 common.Entry(id=entry_id, 74 timestamp=entry_timestamp, 75 msg=msg) 76 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 77 78 mlog.debug("messages added to database (message count: %s)", 79 len(entries)) 80 return entries 81 82 async def add_entries(self, entries: list[common.Entry]): 83 """Add entries""" 84 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 85 'version', 'msg_timestamp', 'hostname', 'app_name', 86 'procid', 'msgid', 'data', 'msg'] 87 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 88 entry.msg.severity.value, entry.msg.version, 89 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 90 entry.msg.procid, entry.msg.msgid, entry.msg.data, 91 entry.msg.msg) 92 for entry in entries] 93 entry_ids = await self._async_group.spawn( 94 self._executor, _ext_insert, self._conn, columns, values) 95 mlog.debug("entries added to database (entry count: %s)", 96 len(entry_ids)) 97 98 async def query(self, 99 filter: common.Filter 100 ) -> list[common.Entry]: 101 """Query entries that satisfy filter""" 102 conditions = [] 103 args = {} 104 if filter.last_id is not None: 105 conditions.append('rowid <= :last_id') 106 args['last_id'] = filter.last_id 107 if filter.entry_timestamp_from is not None: 108 conditions.append('entry_timestamp >= :entry_timestamp_from') 109 args['entry_timestamp_from'] = filter.entry_timestamp_from 110 if filter.entry_timestamp_to is not None: 111 conditions.append('entry_timestamp <= :entry_timestamp_to') 112 args['entry_timestamp_to'] = filter.entry_timestamp_to 113 if filter.facility: 114 conditions.append('facility = :facility') 115 args['facility'] = filter.facility.value 116 if filter.severity: 117 conditions.append('severity = :severity') 118 args['severity'] = filter.severity.value 119 if filter.hostname: 120 conditions.append('hostname LIKE :hostname') 121 args['hostname'] = f'%{filter.hostname}%' 122 if filter.app_name: 123 conditions.append('app_name LIKE :app_name') 124 args['app_name'] = f'%{filter.app_name}%' 125 if filter.procid: 126 conditions.append('procid LIKE :procid') 127 args['procid'] = f'%{filter.procid}%' 128 if filter.msgid: 129 conditions.append('msgid LIKE :msgid') 130 args['msgid'] = f'%{filter.msgid}%' 131 if filter.msg: 132 conditions.append('msg LIKE :msg') 133 args['msg'] = f'%{filter.msg}%' 134 135 result = await self._async_group.spawn( 136 self._executor, _ext_query, self._conn, conditions, args, 137 filter.max_results) 138 139 entries = [common.Entry( 140 id=row['rowid'], 141 timestamp=row['entry_timestamp'], 142 msg=common.Msg(facility=common.Facility(row['facility']), 143 severity=common.Severity(row['severity']), 144 version=row['version'], 145 timestamp=row['msg_timestamp'], 146 hostname=row['hostname'], 147 app_name=row['app_name'], 148 procid=row['procid'], 149 msgid=row['msgid'], 150 data=row['data'], 151 msg=row['msg'])) 152 for row in result] 153 154 mlog.debug("query resulted with %s entries", len(entries)) 155 return entries 156 157 async def delete(self, first_id: int): 158 """Delete entries prior to first_id""" 159 entry_count = await self._async_group.spawn( 160 self._executor, _ext_delete, self._conn, first_id) 161 mlog.debug("deleted %s entries", entry_count)
Resource with lifetime control based on Group
.
async_group: hat.aio.group.Group
43 @property 44 def async_group(self) -> aio.Group: 45 """Async group""" 46 return self._async_group
Async group
async def
get_first_id(self) -> int | None:
48 async def get_first_id(self) -> int | None: 49 """Get first entry id""" 50 return await self._async_group.spawn(self._executor, _ext_fist_id, 51 self._conn)
Get first entry id
async def
get_last_id(self) -> int | None:
53 async def get_last_id(self) -> int | None: 54 """Get last entry id""" 55 return await self._async_group.spawn(self._executor, _ext_last_id, 56 self._conn)
Get last entry id
async def
add_msgs( self, msgs: list[tuple[float, hat.syslog.common.Msg]]) -> list[hat.syslog.server.common.Entry]:
58 async def add_msgs(self, 59 msgs: list[tuple[float, common.Msg]] 60 ) -> list[common.Entry]: 61 """Add timestamped messages""" 62 columns = ['entry_timestamp', 'facility', 'severity', 'version', 63 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 64 'data', 'msg'] 65 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 66 msg.version, msg.timestamp, msg.hostname, msg.app_name, 67 msg.procid, msg.msgid, msg.data, msg.msg) 68 for entry_timestamp, msg in msgs] 69 entry_ids = await self._async_group.spawn( 70 self._executor, _ext_insert, self._conn, columns, values) 71 72 entries = [ 73 common.Entry(id=entry_id, 74 timestamp=entry_timestamp, 75 msg=msg) 76 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 77 78 mlog.debug("messages added to database (message count: %s)", 79 len(entries)) 80 return entries
Add timestamped messages
82 async def add_entries(self, entries: list[common.Entry]): 83 """Add entries""" 84 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 85 'version', 'msg_timestamp', 'hostname', 'app_name', 86 'procid', 'msgid', 'data', 'msg'] 87 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 88 entry.msg.severity.value, entry.msg.version, 89 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 90 entry.msg.procid, entry.msg.msgid, entry.msg.data, 91 entry.msg.msg) 92 for entry in entries] 93 entry_ids = await self._async_group.spawn( 94 self._executor, _ext_insert, self._conn, columns, values) 95 mlog.debug("entries added to database (entry count: %s)", 96 len(entry_ids))
Add entries
async def
query( self, filter: hat.syslog.server.common.Filter) -> list[hat.syslog.server.common.Entry]:
98 async def query(self, 99 filter: common.Filter 100 ) -> list[common.Entry]: 101 """Query entries that satisfy filter""" 102 conditions = [] 103 args = {} 104 if filter.last_id is not None: 105 conditions.append('rowid <= :last_id') 106 args['last_id'] = filter.last_id 107 if filter.entry_timestamp_from is not None: 108 conditions.append('entry_timestamp >= :entry_timestamp_from') 109 args['entry_timestamp_from'] = filter.entry_timestamp_from 110 if filter.entry_timestamp_to is not None: 111 conditions.append('entry_timestamp <= :entry_timestamp_to') 112 args['entry_timestamp_to'] = filter.entry_timestamp_to 113 if filter.facility: 114 conditions.append('facility = :facility') 115 args['facility'] = filter.facility.value 116 if filter.severity: 117 conditions.append('severity = :severity') 118 args['severity'] = filter.severity.value 119 if filter.hostname: 120 conditions.append('hostname LIKE :hostname') 121 args['hostname'] = f'%{filter.hostname}%' 122 if filter.app_name: 123 conditions.append('app_name LIKE :app_name') 124 args['app_name'] = f'%{filter.app_name}%' 125 if filter.procid: 126 conditions.append('procid LIKE :procid') 127 args['procid'] = f'%{filter.procid}%' 128 if filter.msgid: 129 conditions.append('msgid LIKE :msgid') 130 args['msgid'] = f'%{filter.msgid}%' 131 if filter.msg: 132 conditions.append('msg LIKE :msg') 133 args['msg'] = f'%{filter.msg}%' 134 135 result = await self._async_group.spawn( 136 self._executor, _ext_query, self._conn, conditions, args, 137 filter.max_results) 138 139 entries = [common.Entry( 140 id=row['rowid'], 141 timestamp=row['entry_timestamp'], 142 msg=common.Msg(facility=common.Facility(row['facility']), 143 severity=common.Severity(row['severity']), 144 version=row['version'], 145 timestamp=row['msg_timestamp'], 146 hostname=row['hostname'], 147 app_name=row['app_name'], 148 procid=row['procid'], 149 msgid=row['msgid'], 150 data=row['data'], 151 msg=row['msg'])) 152 for row in result] 153 154 mlog.debug("query resulted with %s entries", len(entries)) 155 return entries
Query entries that satisfy filter
async def
delete(self, first_id: int):
157 async def delete(self, first_id: int): 158 """Delete entries prior to first_id""" 159 entry_count = await self._async_group.spawn( 160 self._executor, _ext_delete, self._conn, first_id) 161 mlog.debug("deleted %s entries", entry_count)
Delete entries prior to first_id