|
from __future__ import annotations |
|
|
|
import contextlib |
|
import logging |
|
import os |
|
import time |
|
import warnings |
|
from abc import ABC, abstractmethod |
|
from threading import Lock |
|
from types import TracebackType |
|
from typing import Any |
|
|
|
from ._error import Timeout |
|
|
|
_LOGGER = logging.getLogger("filelock") |
|
|
|
|
|
|
|
|
|
|
|
class AcquireReturnProxy: |
|
"""A context aware object that will release the lock file when exiting.""" |
|
|
|
def __init__(self, lock: BaseFileLock) -> None: |
|
self.lock = lock |
|
|
|
def __enter__(self) -> BaseFileLock: |
|
return self.lock |
|
|
|
def __exit__( |
|
self, |
|
exc_type: type[BaseException] | None, |
|
exc_value: BaseException | None, |
|
traceback: TracebackType | None, |
|
) -> None: |
|
self.lock.release() |
|
|
|
|
|
class BaseFileLock(ABC, contextlib.ContextDecorator): |
|
"""Abstract base class for a file lock object.""" |
|
|
|
def __init__(self, lock_file: str | os.PathLike[Any], timeout: float = -1) -> None: |
|
""" |
|
Create a new lock object. |
|
|
|
:param lock_file: path to the file |
|
:param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in |
|
the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it |
|
to a negative value. A timeout of 0 means, that there is exactly one attempt to acquire the file lock. |
|
""" |
|
|
|
self._lock_file: str = os.fspath(lock_file) |
|
|
|
|
|
|
|
self._lock_file_fd: int | None = None |
|
|
|
|
|
self._timeout: float = timeout |
|
|
|
|
|
self._thread_lock: Lock = Lock() |
|
|
|
|
|
|
|
self._lock_counter: int = 0 |
|
|
|
@property |
|
def lock_file(self) -> str: |
|
""":return: path to the lock file""" |
|
return self._lock_file |
|
|
|
@property |
|
def timeout(self) -> float: |
|
""" |
|
:return: the default timeout value, in seconds |
|
|
|
.. versionadded:: 2.0.0 |
|
""" |
|
return self._timeout |
|
|
|
@timeout.setter |
|
def timeout(self, value: float | str) -> None: |
|
""" |
|
Change the default timeout value. |
|
|
|
:param value: the new value, in seconds |
|
""" |
|
self._timeout = float(value) |
|
|
|
@abstractmethod |
|
def _acquire(self) -> None: |
|
"""If the file lock could be acquired, self._lock_file_fd holds the file descriptor of the lock file.""" |
|
raise NotImplementedError |
|
|
|
@abstractmethod |
|
def _release(self) -> None: |
|
"""Releases the lock and sets self._lock_file_fd to None.""" |
|
raise NotImplementedError |
|
|
|
@property |
|
def is_locked(self) -> bool: |
|
""" |
|
|
|
:return: A boolean indicating if the lock file is holding the lock currently. |
|
|
|
.. versionchanged:: 2.0.0 |
|
|
|
This was previously a method and is now a property. |
|
""" |
|
return self._lock_file_fd is not None |
|
|
|
def acquire( |
|
self, |
|
timeout: float | None = None, |
|
poll_interval: float = 0.05, |
|
*, |
|
poll_intervall: float | None = None, |
|
blocking: bool = True, |
|
) -> AcquireReturnProxy: |
|
""" |
|
Try to acquire the file lock. |
|
|
|
:param timeout: maximum wait time for acquiring the lock, ``None`` means use the default :attr:`~timeout` is and |
|
if ``timeout < 0``, there is no timeout and this method will block until the lock could be acquired |
|
:param poll_interval: interval of trying to acquire the lock file |
|
:param poll_intervall: deprecated, kept for backwards compatibility, use ``poll_interval`` instead |
|
:param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the |
|
first attempt. Otherwise this method will block until the timeout expires or the lock is acquired. |
|
:raises Timeout: if fails to acquire lock within the timeout period |
|
:return: a context object that will unlock the file when the context is exited |
|
|
|
.. code-block:: python |
|
|
|
# You can use this method in the context manager (recommended) |
|
with lock.acquire(): |
|
pass |
|
|
|
# Or use an equivalent try-finally construct: |
|
lock.acquire() |
|
try: |
|
pass |
|
finally: |
|
lock.release() |
|
|
|
.. versionchanged:: 2.0.0 |
|
|
|
This method returns now a *proxy* object instead of *self*, |
|
so that it can be used in a with statement without side effects. |
|
|
|
""" |
|
|
|
if timeout is None: |
|
timeout = self.timeout |
|
|
|
if poll_intervall is not None: |
|
msg = "use poll_interval instead of poll_intervall" |
|
warnings.warn(msg, DeprecationWarning, stacklevel=2) |
|
poll_interval = poll_intervall |
|
|
|
|
|
with self._thread_lock: |
|
self._lock_counter += 1 |
|
|
|
lock_id = id(self) |
|
lock_filename = self._lock_file |
|
start_time = time.monotonic() |
|
try: |
|
while True: |
|
with self._thread_lock: |
|
if not self.is_locked: |
|
_LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename) |
|
self._acquire() |
|
|
|
if self.is_locked: |
|
_LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename) |
|
break |
|
elif blocking is False: |
|
_LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename) |
|
raise Timeout(self._lock_file) |
|
elif 0 <= timeout < time.monotonic() - start_time: |
|
_LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename) |
|
raise Timeout(self._lock_file) |
|
else: |
|
msg = "Lock %s not acquired on %s, waiting %s seconds ..." |
|
_LOGGER.debug(msg, lock_id, lock_filename, poll_interval) |
|
time.sleep(poll_interval) |
|
except BaseException: |
|
with self._thread_lock: |
|
self._lock_counter = max(0, self._lock_counter - 1) |
|
raise |
|
return AcquireReturnProxy(lock=self) |
|
|
|
def release(self, force: bool = False) -> None: |
|
""" |
|
Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. Also |
|
note, that the lock file itself is not automatically deleted. |
|
|
|
:param force: If true, the lock counter is ignored and the lock is released in every case/ |
|
""" |
|
with self._thread_lock: |
|
|
|
if self.is_locked: |
|
self._lock_counter -= 1 |
|
|
|
if self._lock_counter == 0 or force: |
|
lock_id, lock_filename = id(self), self._lock_file |
|
|
|
_LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename) |
|
self._release() |
|
self._lock_counter = 0 |
|
_LOGGER.debug("Lock %s released on %s", lock_id, lock_filename) |
|
|
|
def __enter__(self) -> BaseFileLock: |
|
""" |
|
Acquire the lock. |
|
|
|
:return: the lock object |
|
""" |
|
self.acquire() |
|
return self |
|
|
|
def __exit__( |
|
self, |
|
exc_type: type[BaseException] | None, |
|
exc_value: BaseException | None, |
|
traceback: TracebackType | None, |
|
) -> None: |
|
""" |
|
Release the lock. |
|
|
|
:param exc_type: the exception type if raised |
|
:param exc_value: the exception value if raised |
|
:param traceback: the exception traceback if raised |
|
""" |
|
self.release() |
|
|
|
def __del__(self) -> None: |
|
"""Called when the lock object is deleted.""" |
|
self.release(force=True) |
|
|
|
|
|
__all__ = [ |
|
"BaseFileLock", |
|
"AcquireReturnProxy", |
|
] |
|
|