hat.syslog.server.backend

Backend implementation

  1"""Backend implementation"""
  2
  3from collections.abc import Callable
  4from pathlib import Path
  5import asyncio
  6import contextlib
  7import itertools
  8import logging
  9
 10from hat import aio
 11from hat import util
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import database
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20register_delay: float = 0.1
 21"""Registration delay in seconds"""
 22
 23register_queue_size: int = 50
 24"""Registration queue size"""
 25
 26register_queue_treshold: int = 10
 27"""Registration queue threshold"""
 28
 29
 30async def create_backend(path: Path,
 31                         low_size: int,
 32                         high_size: int,
 33                         enable_archive: bool,
 34                         disable_journal: bool
 35                         ) -> 'Backend':
 36    """Create backend"""
 37    db = await database.create_database(path, disable_journal)
 38    try:
 39        first_id = await db.get_first_id()
 40        last_id = await db.get_last_id()
 41    except BaseException:
 42        await aio.uncancellable(db.async_close())
 43        raise
 44
 45    backend = Backend()
 46    backend._path = path
 47    backend._low_size = low_size
 48    backend._high_size = high_size
 49    backend._enable_archive = enable_archive
 50    backend._disable_journal = disable_journal
 51    backend._db = db
 52    backend._first_id = first_id
 53    backend._last_id = last_id
 54    backend._async_group = aio.Group()
 55    backend._change_cbs = util.CallbackRegistry()
 56    backend._msg_queue = aio.Queue(register_queue_size)
 57    backend._executor = aio.create_executor()
 58
 59    backend._async_group.spawn(aio.call_on_cancel, db.async_close)
 60    backend._async_group.spawn(backend._loop)
 61
 62    mlog.debug('created backend with database %s', path)
 63    return backend
 64
 65
 66class Backend(aio.Resource):
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._async_group
 72
 73    @property
 74    def first_id(self) -> int | None:
 75        """First entry id"""
 76        return self._first_id
 77
 78    @property
 79    def last_id(self) -> int | None:
 80        """Last entry id"""
 81        return self._last_id
 82
 83    def register_change_cb(self,
 84                           cb: Callable[[list[common.Entry]], None]
 85                           ) -> util.RegisterCallbackHandle:
 86        """Register change callback
 87
 88        Callback is called if `first_id` changes and/or `last_id` changes
 89        and/or new entries are available (passed as argument to registered
 90        callback).
 91
 92        """
 93        return self._change_cbs.register(cb)
 94
 95    async def register(self,
 96                       timestamp: float,
 97                       msg: common.Msg):
 98        """Register message
 99
100        Registration adds msg to registration queue. If queue is full, wait
101        until message can be successfully added.
102
103        When message is added to empty queue, registration delay timer is
104        started. Once delay timer expires or if number of messages in queue
105        is greater than threshold, all messages are removed from queue and
106        inserted into sqlite database.
107
108        """
109        await self._msg_queue.put((timestamp, msg))
110
111    async def query(self,
112                    filter: common.Filter
113                    ) -> list[common.Entry]:
114        """Query entries"""
115        return await self._db.query(filter)
116
117    async def _loop(self):
118        try:
119            while True:
120                msgs = await self._get_msgs()
121                await self._process_msgs(msgs)
122
123        except Exception as e:
124            mlog.warn("backend loop error: %s", e, exc_info=e)
125
126        finally:
127            self.close()
128            self._msg_queue.close()
129            mlog.debug('backend loop closed')
130
131    async def _get_msgs(self):
132        loop = asyncio.get_running_loop()
133        msgs = []
134
135        msg = await self._msg_queue.get()
136        msgs.append(msg)
137
138        start = loop.time()
139        while True:
140            while not self._msg_queue.empty():
141                msgs.append(self._msg_queue.get_nowait())
142            timeout = register_delay - (loop.time() - start)
143            if timeout <= 0:
144                break
145            if len(msgs) >= register_queue_treshold:
146                break
147            async_group = aio.Group()
148            try:
149                f = async_group.spawn(self._msg_queue.get)
150                await aio.wait_for(asyncio.shield(f), timeout)
151            except asyncio.TimeoutError:
152                break
153            finally:
154                await aio.uncancellable(async_group.async_close())
155                if not f.cancelled():
156                    msgs.append(f.result())
157
158        while not self._msg_queue.empty():
159            msgs.append(self._msg_queue.get_nowait())
160        return msgs
161
162    async def _process_msgs(self, msgs):
163        mlog.debug("registering new messages (message count: %s)...",
164                   len(msgs))
165        entries = await self._db.add_msgs(msgs)
166        if not entries:
167            return
168        entries = list(reversed(entries))
169
170        self._last_id = entries[0].id
171        if self._first_id is None:
172            self._first_id = entries[-1].id
173
174        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
175                   self._first_id, self._last_id)
176        self._change_cbs.notify(entries)
177
178        if self._high_size <= 0:
179            return
180        if self._last_id - self._first_id + 1 <= self._high_size:
181            return
182
183        mlog.debug("database cleanup starting...")
184        await self._db_cleanup()
185
186    async def _db_cleanup(self):
187        first_id = self._last_id - self._low_size + 1
188        if first_id > self._last_id:
189            first_id = None
190        if first_id <= self._first_id:
191            return
192
193        if self._enable_archive:
194            mlog.debug("archiving database entries...")
195            await self._archive_db(first_id)
196
197        await self._db.delete(first_id)
198        self._first_id = first_id
199        if self._first_id is None:
200            self._last_id = None
201
202        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
203                   self._first_id, self._last_id)
204        self._change_cbs.notify([])
205
206    async def _archive_db(self, first_id):
207        archive_path = await self._async_group.spawn(
208            self._executor, _ext_get_new_archive_path, self._path)
209        archive = await database.create_database(
210            archive_path, self._disable_journal)
211        try:
212            entries = await self._db.query(common.Filter(
213                last_id=first_id - 1 if first_id is not None else None))
214            await archive.add_entries(entries)
215        finally:
216            await aio.uncancellable(archive.async_close())
217
218
219def _ext_get_new_archive_path(db_path):
220    last_index = 0
221
222    for i in db_path.parent.glob(db_path.name + '.*'):
223        with contextlib.suppress(ValueError):
224            index = int(i.name.split('.')[-1])
225            if index > last_index:
226                last_index = index
227
228    for i in itertools.count(last_index + 1):
229        new_path = db_path.parent / f"{db_path.name}.{i}"
230        if new_path.exists():
231            continue
232        return new_path
mlog: logging.Logger = <Logger hat.syslog.server.backend (WARNING)>

Module logger

register_delay: float = 0.1

Registration delay in seconds

register_queue_size: int = 50

Registration queue size

register_queue_treshold: int = 10

Registration queue threshold

async def create_backend( path: pathlib.Path, low_size: int, high_size: int, enable_archive: bool, disable_journal: bool) -> Backend:
31async def create_backend(path: Path,
32                         low_size: int,
33                         high_size: int,
34                         enable_archive: bool,
35                         disable_journal: bool
36                         ) -> 'Backend':
37    """Create backend"""
38    db = await database.create_database(path, disable_journal)
39    try:
40        first_id = await db.get_first_id()
41        last_id = await db.get_last_id()
42    except BaseException:
43        await aio.uncancellable(db.async_close())
44        raise
45
46    backend = Backend()
47    backend._path = path
48    backend._low_size = low_size
49    backend._high_size = high_size
50    backend._enable_archive = enable_archive
51    backend._disable_journal = disable_journal
52    backend._db = db
53    backend._first_id = first_id
54    backend._last_id = last_id
55    backend._async_group = aio.Group()
56    backend._change_cbs = util.CallbackRegistry()
57    backend._msg_queue = aio.Queue(register_queue_size)
58    backend._executor = aio.create_executor()
59
60    backend._async_group.spawn(aio.call_on_cancel, db.async_close)
61    backend._async_group.spawn(backend._loop)
62
63    mlog.debug('created backend with database %s', path)
64    return backend

Create backend

class Backend(hat.aio.group.Resource):
 67class Backend(aio.Resource):
 68
 69    @property
 70    def async_group(self) -> aio.Group:
 71        """Async group"""
 72        return self._async_group
 73
 74    @property
 75    def first_id(self) -> int | None:
 76        """First entry id"""
 77        return self._first_id
 78
 79    @property
 80    def last_id(self) -> int | None:
 81        """Last entry id"""
 82        return self._last_id
 83
 84    def register_change_cb(self,
 85                           cb: Callable[[list[common.Entry]], None]
 86                           ) -> util.RegisterCallbackHandle:
 87        """Register change callback
 88
 89        Callback is called if `first_id` changes and/or `last_id` changes
 90        and/or new entries are available (passed as argument to registered
 91        callback).
 92
 93        """
 94        return self._change_cbs.register(cb)
 95
 96    async def register(self,
 97                       timestamp: float,
 98                       msg: common.Msg):
 99        """Register message
100
101        Registration adds msg to registration queue. If queue is full, wait
102        until message can be successfully added.
103
104        When message is added to empty queue, registration delay timer is
105        started. Once delay timer expires or if number of messages in queue
106        is greater than threshold, all messages are removed from queue and
107        inserted into sqlite database.
108
109        """
110        await self._msg_queue.put((timestamp, msg))
111
112    async def query(self,
113                    filter: common.Filter
114                    ) -> list[common.Entry]:
115        """Query entries"""
116        return await self._db.query(filter)
117
118    async def _loop(self):
119        try:
120            while True:
121                msgs = await self._get_msgs()
122                await self._process_msgs(msgs)
123
124        except Exception as e:
125            mlog.warn("backend loop error: %s", e, exc_info=e)
126
127        finally:
128            self.close()
129            self._msg_queue.close()
130            mlog.debug('backend loop closed')
131
132    async def _get_msgs(self):
133        loop = asyncio.get_running_loop()
134        msgs = []
135
136        msg = await self._msg_queue.get()
137        msgs.append(msg)
138
139        start = loop.time()
140        while True:
141            while not self._msg_queue.empty():
142                msgs.append(self._msg_queue.get_nowait())
143            timeout = register_delay - (loop.time() - start)
144            if timeout <= 0:
145                break
146            if len(msgs) >= register_queue_treshold:
147                break
148            async_group = aio.Group()
149            try:
150                f = async_group.spawn(self._msg_queue.get)
151                await aio.wait_for(asyncio.shield(f), timeout)
152            except asyncio.TimeoutError:
153                break
154            finally:
155                await aio.uncancellable(async_group.async_close())
156                if not f.cancelled():
157                    msgs.append(f.result())
158
159        while not self._msg_queue.empty():
160            msgs.append(self._msg_queue.get_nowait())
161        return msgs
162
163    async def _process_msgs(self, msgs):
164        mlog.debug("registering new messages (message count: %s)...",
165                   len(msgs))
166        entries = await self._db.add_msgs(msgs)
167        if not entries:
168            return
169        entries = list(reversed(entries))
170
171        self._last_id = entries[0].id
172        if self._first_id is None:
173            self._first_id = entries[-1].id
174
175        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
176                   self._first_id, self._last_id)
177        self._change_cbs.notify(entries)
178
179        if self._high_size <= 0:
180            return
181        if self._last_id - self._first_id + 1 <= self._high_size:
182            return
183
184        mlog.debug("database cleanup starting...")
185        await self._db_cleanup()
186
187    async def _db_cleanup(self):
188        first_id = self._last_id - self._low_size + 1
189        if first_id > self._last_id:
190            first_id = None
191        if first_id <= self._first_id:
192            return
193
194        if self._enable_archive:
195            mlog.debug("archiving database entries...")
196            await self._archive_db(first_id)
197
198        await self._db.delete(first_id)
199        self._first_id = first_id
200        if self._first_id is None:
201            self._last_id = None
202
203        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
204                   self._first_id, self._last_id)
205        self._change_cbs.notify([])
206
207    async def _archive_db(self, first_id):
208        archive_path = await self._async_group.spawn(
209            self._executor, _ext_get_new_archive_path, self._path)
210        archive = await database.create_database(
211            archive_path, self._disable_journal)
212        try:
213            entries = await self._db.query(common.Filter(
214                last_id=first_id - 1 if first_id is not None else None))
215            await archive.add_entries(entries)
216        finally:
217            await aio.uncancellable(archive.async_close())

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
69    @property
70    def async_group(self) -> aio.Group:
71        """Async group"""
72        return self._async_group

Async group

first_id: int | None
74    @property
75    def first_id(self) -> int | None:
76        """First entry id"""
77        return self._first_id

First entry id

last_id: int | None
79    @property
80    def last_id(self) -> int | None:
81        """Last entry id"""
82        return self._last_id

Last entry id

def register_change_cb( self, cb: Callable[[list[hat.syslog.server.common.Entry]], None]) -> hat.util.callback.RegisterCallbackHandle:
84    def register_change_cb(self,
85                           cb: Callable[[list[common.Entry]], None]
86                           ) -> util.RegisterCallbackHandle:
87        """Register change callback
88
89        Callback is called if `first_id` changes and/or `last_id` changes
90        and/or new entries are available (passed as argument to registered
91        callback).
92
93        """
94        return self._change_cbs.register(cb)

Register change callback

Callback is called if first_id changes and/or last_id changes and/or new entries are available (passed as argument to registered callback).

async def register(self, timestamp: float, msg: hat.syslog.common.Msg):
 96    async def register(self,
 97                       timestamp: float,
 98                       msg: common.Msg):
 99        """Register message
100
101        Registration adds msg to registration queue. If queue is full, wait
102        until message can be successfully added.
103
104        When message is added to empty queue, registration delay timer is
105        started. Once delay timer expires or if number of messages in queue
106        is greater than threshold, all messages are removed from queue and
107        inserted into sqlite database.
108
109        """
110        await self._msg_queue.put((timestamp, msg))

Register message

Registration adds msg to registration queue. If queue is full, wait until message can be successfully added.

When message is added to empty queue, registration delay timer is started. Once delay timer expires or if number of messages in queue is greater than threshold, all messages are removed from queue and inserted into sqlite database.

async def query( self, filter: hat.syslog.server.common.Filter) -> list[hat.syslog.server.common.Entry]:
112    async def query(self,
113                    filter: common.Filter
114                    ) -> list[common.Entry]:
115        """Query entries"""
116        return await self._db.query(filter)

Query entries