hat.syslog.server.database

Interface to SQLite database

  1"""Interface to SQLite database"""
  2
  3from pathlib import Path
  4import logging
  5import sqlite3
  6import typing
  7
  8from hat import aio
  9from hat.syslog.server import common
 10
 11
 12mlog: logging.Logger = logging.getLogger(__name__)
 13"""Module logger"""
 14
 15
 16async def create_database(path: Path,
 17                          disable_journal: bool
 18                          ) -> 'Database':
 19    """Create database"""
 20
 21    async def close():
 22        await executor(_ext_close, conn)
 23        mlog.debug('database %s closed', path)
 24
 25    executor = aio.create_executor(1)
 26    conn = await executor(_ext_connect, path, disable_journal)
 27    async_group = aio.Group()
 28    async_group.spawn(aio.call_on_cancel, close)
 29
 30    db = Database()
 31    db._path = path
 32    db._conn = conn
 33    db._async_group = async_group
 34    db._executor = executor
 35
 36    mlog.debug('opened database %s', path)
 37    return db
 38
 39
 40class Database(aio.Resource):
 41
 42    @property
 43    def async_group(self) -> aio.Group:
 44        """Async group"""
 45        return self._async_group
 46
 47    async def get_first_id(self) -> typing.Optional[int]:
 48        """Get first entry id"""
 49        return await self._async_group.spawn(self._executor, _ext_fist_id,
 50                                             self._conn)
 51
 52    async def get_last_id(self) -> typing.Optional[int]:
 53        """Get last entry id"""
 54        return await self._async_group.spawn(self._executor, _ext_last_id,
 55                                             self._conn)
 56
 57    async def add_msgs(self,
 58                       msgs: typing.List[typing.Tuple[float, common.Msg]]
 59                       ) -> typing.List[common.Entry]:
 60        """Add timestamped messages"""
 61        columns = ['entry_timestamp', 'facility', 'severity', 'version',
 62                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
 63                   'data', 'msg']
 64        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
 65                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
 66                   msg.procid, msg.msgid, msg.data, msg.msg)
 67                  for entry_timestamp, msg in msgs]
 68        entry_ids = await self._async_group.spawn(
 69            self._executor, _ext_insert, self._conn, columns, values)
 70
 71        entries = [
 72            common.Entry(id=entry_id,
 73                         timestamp=entry_timestamp,
 74                         msg=msg)
 75            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
 76
 77        mlog.debug("messages added to database (message count: %s)",
 78                   len(entries))
 79        return entries
 80
 81    async def add_entries(self, entries: typing.List[common.Entry]):
 82        """Add entries"""
 83        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
 84                   'version', 'msg_timestamp', 'hostname', 'app_name',
 85                   'procid', 'msgid', 'data', 'msg']
 86        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
 87                   entry.msg.severity.value, entry.msg.version,
 88                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
 89                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
 90                   entry.msg.msg)
 91                  for entry in entries]
 92        entry_ids = await self._async_group.spawn(
 93            self._executor, _ext_insert, self._conn, columns, values)
 94        mlog.debug("entries added to database (entry count: %s)",
 95                   len(entry_ids))
 96
 97    async def query(self,
 98                    filter: common.Filter
 99                    ) -> typing.List[common.Entry]:
100        """Query entries that satisfy filter"""
101        conditions = []
102        args = {}
103        if filter.last_id is not None:
104            conditions.append('rowid <= :last_id')
105            args['last_id'] = filter.last_id
106        if filter.entry_timestamp_from is not None:
107            conditions.append('entry_timestamp >= :entry_timestamp_from')
108            args['entry_timestamp_from'] = filter.entry_timestamp_from
109        if filter.entry_timestamp_to is not None:
110            conditions.append('entry_timestamp <= :entry_timestamp_to')
111            args['entry_timestamp_to'] = filter.entry_timestamp_to
112        if filter.facility:
113            conditions.append('facility = :facility')
114            args['facility'] = filter.facility.value
115        if filter.severity:
116            conditions.append('severity = :severity')
117            args['severity'] = filter.severity.value
118        if filter.hostname:
119            conditions.append('hostname LIKE :hostname')
120            args['hostname'] = f'%{filter.hostname}%'
121        if filter.app_name:
122            conditions.append('app_name LIKE :app_name')
123            args['app_name'] = f'%{filter.app_name}%'
124        if filter.procid:
125            conditions.append('procid LIKE :procid')
126            args['procid'] = f'%{filter.procid}%'
127        if filter.msgid:
128            conditions.append('msgid LIKE :msgid')
129            args['msgid'] = f'%{filter.msgid}%'
130        if filter.msg:
131            conditions.append('msg LIKE :msg')
132            args['msg'] = f'%{filter.msg}%'
133
134        result = await self._async_group.spawn(
135            self._executor, _ext_query, self._conn, conditions, args,
136            filter.max_results)
137
138        entries = [common.Entry(
139                    id=row['rowid'],
140                    timestamp=row['entry_timestamp'],
141                    msg=common.Msg(facility=common.Facility(row['facility']),
142                                   severity=common.Severity(row['severity']),
143                                   version=row['version'],
144                                   timestamp=row['msg_timestamp'],
145                                   hostname=row['hostname'],
146                                   app_name=row['app_name'],
147                                   procid=row['procid'],
148                                   msgid=row['msgid'],
149                                   data=row['data'],
150                                   msg=row['msg']))
151                   for row in result]
152
153        mlog.debug("query resulted with %s entries", len(entries))
154        return entries
155
156    async def delete(self, first_id: int):
157        """Delete entries prior to first_id"""
158        entry_count = await self._async_group.spawn(
159            self._executor, _ext_delete, self._conn, first_id)
160        mlog.debug("deleted %s entries", entry_count)
161
162
163_db_columns = [['entry_timestamp', 'REAL'],
164               ['facility', 'INTEGER'],
165               ['severity', 'INTEGER'],
166               ['version', 'INTEGER'],
167               ['msg_timestamp', 'REAL'],
168               ['hostname', 'TEXT'],
169               ['app_name', 'TEXT'],
170               ['procid', 'TEXT'],
171               ['msgid', 'TEXT'],
172               ['data', 'TEXT'],
173               ['msg', 'TEXT']]
174
175_db_query_columns = ['rowid'] + [name for name, _ in _db_columns]
176
177_db_structure = f"""
178    CREATE TABLE IF NOT EXISTS log (
179        {', '.join(col_name + ' ' + col_type
180                   for col_name, col_type in _db_columns)}
181    );
182    CREATE INDEX IF NOT EXISTS log_entry_timestamp_index ON log (
183        entry_timestamp DESC)
184    """
185
186
187def _ext_connect(path, disable_journal):
188    path.parent.mkdir(exist_ok=True)
189    conn = sqlite3.connect(f'file:{path}?nolock=1',
190                           uri=True,
191                           isolation_level=None,
192                           detect_types=sqlite3.PARSE_DECLTYPES)
193    try:
194        conn.executescript(
195            ('PRAGMA journal_mode = OFF;\n' if disable_journal else '') +
196            _db_structure)
197    except Exception:
198        conn.close()
199        raise
200    return conn
201
202
203def _ext_close(conn):
204    conn.close()
205
206
207def _ext_fist_id(conn):
208    c = conn.execute("SELECT MIN(rowid) FROM log")
209    result = c.fetchall()
210    return result[0][0] if result else None
211
212
213def _ext_last_id(conn):
214    c = conn.execute("SELECT MAX(rowid) FROM log")
215    result = c.fetchall()
216    return result[0][0] if result else None
217
218
219def _ext_delete(conn, first_id):
220    cmd = "DELETE FROM log"
221    if first_id is not None:
222        cmd += " WHERE rowid < :first_id"
223    c = conn.execute(cmd, {'first_id': first_id})
224    return c.rowcount
225
226
227def _ext_insert(conn, columns, values):
228    c = conn.executemany(f"INSERT INTO log ({', '.join(columns)}) "
229                         f"VALUES ({', '.join('?' * len(columns))})", values)
230    rowcount = c.rowcount
231    if rowcount < 1:
232        return []
233    last_id = _ext_last_id(conn)
234    return range(last_id - rowcount + 1, last_id + 1)
235
236
237def _ext_query(conn, conditions, args, max_results):
238    c = conn.execute(
239        ' '.join([
240            "SELECT rowid, *",
241            "FROM log",
242            ('WHERE ' + ' AND '.join(conditions) if conditions else ''),
243            "ORDER BY rowid DESC",
244            ("LIMIT :max_results" if max_results is not None else '')]),
245        dict(args, max_results=max_results))
246    result = c.fetchall()
247    return [{k: v for k, v in zip(_db_query_columns, i)}
248            for i in result]
mlog: logging.Logger = <Logger hat.syslog.server.database (WARNING)>

Module logger

async def create_database( path: pathlib.Path, disable_journal: bool) -> hat.syslog.server.database.Database:
17async def create_database(path: Path,
18                          disable_journal: bool
19                          ) -> 'Database':
20    """Create database"""
21
22    async def close():
23        await executor(_ext_close, conn)
24        mlog.debug('database %s closed', path)
25
26    executor = aio.create_executor(1)
27    conn = await executor(_ext_connect, path, disable_journal)
28    async_group = aio.Group()
29    async_group.spawn(aio.call_on_cancel, close)
30
31    db = Database()
32    db._path = path
33    db._conn = conn
34    db._async_group = async_group
35    db._executor = executor
36
37    mlog.debug('opened database %s', path)
38    return db

Create database

class Database(hat.aio.Resource):
 41class Database(aio.Resource):
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        """Async group"""
 46        return self._async_group
 47
 48    async def get_first_id(self) -> typing.Optional[int]:
 49        """Get first entry id"""
 50        return await self._async_group.spawn(self._executor, _ext_fist_id,
 51                                             self._conn)
 52
 53    async def get_last_id(self) -> typing.Optional[int]:
 54        """Get last entry id"""
 55        return await self._async_group.spawn(self._executor, _ext_last_id,
 56                                             self._conn)
 57
 58    async def add_msgs(self,
 59                       msgs: typing.List[typing.Tuple[float, common.Msg]]
 60                       ) -> typing.List[common.Entry]:
 61        """Add timestamped messages"""
 62        columns = ['entry_timestamp', 'facility', 'severity', 'version',
 63                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
 64                   'data', 'msg']
 65        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
 66                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
 67                   msg.procid, msg.msgid, msg.data, msg.msg)
 68                  for entry_timestamp, msg in msgs]
 69        entry_ids = await self._async_group.spawn(
 70            self._executor, _ext_insert, self._conn, columns, values)
 71
 72        entries = [
 73            common.Entry(id=entry_id,
 74                         timestamp=entry_timestamp,
 75                         msg=msg)
 76            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
 77
 78        mlog.debug("messages added to database (message count: %s)",
 79                   len(entries))
 80        return entries
 81
 82    async def add_entries(self, entries: typing.List[common.Entry]):
 83        """Add entries"""
 84        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
 85                   'version', 'msg_timestamp', 'hostname', 'app_name',
 86                   'procid', 'msgid', 'data', 'msg']
 87        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
 88                   entry.msg.severity.value, entry.msg.version,
 89                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
 90                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
 91                   entry.msg.msg)
 92                  for entry in entries]
 93        entry_ids = await self._async_group.spawn(
 94            self._executor, _ext_insert, self._conn, columns, values)
 95        mlog.debug("entries added to database (entry count: %s)",
 96                   len(entry_ids))
 97
 98    async def query(self,
 99                    filter: common.Filter
100                    ) -> typing.List[common.Entry]:
101        """Query entries that satisfy filter"""
102        conditions = []
103        args = {}
104        if filter.last_id is not None:
105            conditions.append('rowid <= :last_id')
106            args['last_id'] = filter.last_id
107        if filter.entry_timestamp_from is not None:
108            conditions.append('entry_timestamp >= :entry_timestamp_from')
109            args['entry_timestamp_from'] = filter.entry_timestamp_from
110        if filter.entry_timestamp_to is not None:
111            conditions.append('entry_timestamp <= :entry_timestamp_to')
112            args['entry_timestamp_to'] = filter.entry_timestamp_to
113        if filter.facility:
114            conditions.append('facility = :facility')
115            args['facility'] = filter.facility.value
116        if filter.severity:
117            conditions.append('severity = :severity')
118            args['severity'] = filter.severity.value
119        if filter.hostname:
120            conditions.append('hostname LIKE :hostname')
121            args['hostname'] = f'%{filter.hostname}%'
122        if filter.app_name:
123            conditions.append('app_name LIKE :app_name')
124            args['app_name'] = f'%{filter.app_name}%'
125        if filter.procid:
126            conditions.append('procid LIKE :procid')
127            args['procid'] = f'%{filter.procid}%'
128        if filter.msgid:
129            conditions.append('msgid LIKE :msgid')
130            args['msgid'] = f'%{filter.msgid}%'
131        if filter.msg:
132            conditions.append('msg LIKE :msg')
133            args['msg'] = f'%{filter.msg}%'
134
135        result = await self._async_group.spawn(
136            self._executor, _ext_query, self._conn, conditions, args,
137            filter.max_results)
138
139        entries = [common.Entry(
140                    id=row['rowid'],
141                    timestamp=row['entry_timestamp'],
142                    msg=common.Msg(facility=common.Facility(row['facility']),
143                                   severity=common.Severity(row['severity']),
144                                   version=row['version'],
145                                   timestamp=row['msg_timestamp'],
146                                   hostname=row['hostname'],
147                                   app_name=row['app_name'],
148                                   procid=row['procid'],
149                                   msgid=row['msgid'],
150                                   data=row['data'],
151                                   msg=row['msg']))
152                   for row in result]
153
154        mlog.debug("query resulted with %s entries", len(entries))
155        return entries
156
157    async def delete(self, first_id: int):
158        """Delete entries prior to first_id"""
159        entry_count = await self._async_group.spawn(
160            self._executor, _ext_delete, self._conn, first_id)
161        mlog.debug("deleted %s entries", entry_count)

Resource with lifetime control based on Group.

Database()
async_group: hat.aio.Group

Async group

async def get_first_id(self) -> Optional[int]:
48    async def get_first_id(self) -> typing.Optional[int]:
49        """Get first entry id"""
50        return await self._async_group.spawn(self._executor, _ext_fist_id,
51                                             self._conn)

Get first entry id

async def get_last_id(self) -> Optional[int]:
53    async def get_last_id(self) -> typing.Optional[int]:
54        """Get last entry id"""
55        return await self._async_group.spawn(self._executor, _ext_last_id,
56                                             self._conn)

Get last entry id

async def add_msgs( self, msgs: List[Tuple[float, hat.syslog.common.Msg]]) -> List[hat.syslog.server.common.Entry]:
58    async def add_msgs(self,
59                       msgs: typing.List[typing.Tuple[float, common.Msg]]
60                       ) -> typing.List[common.Entry]:
61        """Add timestamped messages"""
62        columns = ['entry_timestamp', 'facility', 'severity', 'version',
63                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
64                   'data', 'msg']
65        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
66                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
67                   msg.procid, msg.msgid, msg.data, msg.msg)
68                  for entry_timestamp, msg in msgs]
69        entry_ids = await self._async_group.spawn(
70            self._executor, _ext_insert, self._conn, columns, values)
71
72        entries = [
73            common.Entry(id=entry_id,
74                         timestamp=entry_timestamp,
75                         msg=msg)
76            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
77
78        mlog.debug("messages added to database (message count: %s)",
79                   len(entries))
80        return entries

Add timestamped messages

async def add_entries(self, entries: List[hat.syslog.server.common.Entry]):
82    async def add_entries(self, entries: typing.List[common.Entry]):
83        """Add entries"""
84        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
85                   'version', 'msg_timestamp', 'hostname', 'app_name',
86                   'procid', 'msgid', 'data', 'msg']
87        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
88                   entry.msg.severity.value, entry.msg.version,
89                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
90                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
91                   entry.msg.msg)
92                  for entry in entries]
93        entry_ids = await self._async_group.spawn(
94            self._executor, _ext_insert, self._conn, columns, values)
95        mlog.debug("entries added to database (entry count: %s)",
96                   len(entry_ids))

Add entries

async def query( self, filter: hat.syslog.server.common.Filter) -> List[hat.syslog.server.common.Entry]:
 98    async def query(self,
 99                    filter: common.Filter
100                    ) -> typing.List[common.Entry]:
101        """Query entries that satisfy filter"""
102        conditions = []
103        args = {}
104        if filter.last_id is not None:
105            conditions.append('rowid <= :last_id')
106            args['last_id'] = filter.last_id
107        if filter.entry_timestamp_from is not None:
108            conditions.append('entry_timestamp >= :entry_timestamp_from')
109            args['entry_timestamp_from'] = filter.entry_timestamp_from
110        if filter.entry_timestamp_to is not None:
111            conditions.append('entry_timestamp <= :entry_timestamp_to')
112            args['entry_timestamp_to'] = filter.entry_timestamp_to
113        if filter.facility:
114            conditions.append('facility = :facility')
115            args['facility'] = filter.facility.value
116        if filter.severity:
117            conditions.append('severity = :severity')
118            args['severity'] = filter.severity.value
119        if filter.hostname:
120            conditions.append('hostname LIKE :hostname')
121            args['hostname'] = f'%{filter.hostname}%'
122        if filter.app_name:
123            conditions.append('app_name LIKE :app_name')
124            args['app_name'] = f'%{filter.app_name}%'
125        if filter.procid:
126            conditions.append('procid LIKE :procid')
127            args['procid'] = f'%{filter.procid}%'
128        if filter.msgid:
129            conditions.append('msgid LIKE :msgid')
130            args['msgid'] = f'%{filter.msgid}%'
131        if filter.msg:
132            conditions.append('msg LIKE :msg')
133            args['msg'] = f'%{filter.msg}%'
134
135        result = await self._async_group.spawn(
136            self._executor, _ext_query, self._conn, conditions, args,
137            filter.max_results)
138
139        entries = [common.Entry(
140                    id=row['rowid'],
141                    timestamp=row['entry_timestamp'],
142                    msg=common.Msg(facility=common.Facility(row['facility']),
143                                   severity=common.Severity(row['severity']),
144                                   version=row['version'],
145                                   timestamp=row['msg_timestamp'],
146                                   hostname=row['hostname'],
147                                   app_name=row['app_name'],
148                                   procid=row['procid'],
149                                   msgid=row['msgid'],
150                                   data=row['data'],
151                                   msg=row['msg']))
152                   for row in result]
153
154        mlog.debug("query resulted with %s entries", len(entries))
155        return entries

Query entries that satisfy filter

async def delete(self, first_id: int):
157    async def delete(self, first_id: int):
158        """Delete entries prior to first_id"""
159        entry_count = await self._async_group.spawn(
160            self._executor, _ext_delete, self._conn, first_id)
161        mlog.debug("deleted %s entries", entry_count)

Delete entries prior to first_id

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close