1 # SPDX-License-Identifier: LGPL-3-or-later
2 # Copyright 2022 Jacob Lifshay
5 from contextlib
import contextmanager
6 from itertools
import count
8 from pathlib
import Path
10 from tempfile
import NamedTemporaryFile
11 from threading
import local
17 def __try_lock_file(path
):
20 file = path
.open("xb")
21 except FileExistsError
:
33 path
= getattr(__ctrl_path
, "path", None)
35 assert isinstance(path
, str), "invalid state"
37 for i
in range(10000):
39 with
__try_lock_file(f
"crtl{i}.lock") as locked
:
40 if locked
and next(Path(path
).glob(".lock_*"), None) is None:
41 shutil
.rmtree(path
, ignore_errors
=True)
42 Path(path
).mkdir(parents
=True, exist_ok
=True)
43 tmpfile
= NamedTemporaryFile(prefix
=".lock_", dir=path
)
44 __ctrl_path
.tmpfile
= tmpfile
45 __ctrl_path
.path
= path
47 assert False, "can't create crtl* path"