Wauplin HF staff commited on
Commit
ec04781
1 Parent(s): 061f8cb

admin section

Browse files
Files changed (3) hide show
  1. .gitignore +2 -2
  2. Makefile +1 -1
  3. user_history.py +140 -34
.gitignore CHANGED
@@ -140,5 +140,5 @@ dmypy.json
140
  cspell.json
141
 
142
  mock
143
- user_history
144
- _history_snapshots
 
140
  cspell.json
141
 
142
  mock
143
+ _user_history
144
+ _user_history_exports
Makefile CHANGED
@@ -3,7 +3,7 @@
3
  quality:
4
  black --check .
5
  ruff .
6
- mypy .
7
 
8
  style:
9
  black .
 
3
  quality:
4
  black --check .
5
  ruff .
6
+ mypy --install-types .
7
 
8
  style:
9
  black .
user_history.py CHANGED
@@ -3,25 +3,21 @@ import os
3
  import shutil
4
  import warnings
5
  from datetime import datetime
 
6
  from pathlib import Path
7
- from typing import Dict, List, Tuple
8
  from uuid import uuid4
9
 
10
  import gradio as gr
11
  import numpy as np
 
12
  from filelock import FileLock
13
  from PIL.Image import Image
14
 
15
 
16
- def setup(
17
- folder_path: str | Path | None = None,
18
- delete_button: bool = True,
19
- export_button: bool = True,
20
- ) -> None:
21
  user_history = _UserHistory()
22
  user_history.folder_path = _resolve_folder_path(folder_path)
23
- user_history.delete_button = delete_button
24
- user_history.export_button = export_button
25
  user_history.initialized = True
26
 
27
 
@@ -33,20 +29,19 @@ def render() -> None:
33
  print("Initializing user history with default config. Use `user_history.setup(...)` to customize.")
34
  setup()
35
 
36
- # deactivate if no persistent storage
37
- if user_history.folder_path is None:
38
- gr.Markdown(
39
- "User history is deactivated as no Persistent Storage volume has been found. Please contact the Space"
40
- " owner to either assign a [Persistent Storage](https://huggingface.co/docs/hub/spaces-storage) or set"
41
- " `folder_path` to a temporary folder."
42
- )
43
- return
44
-
45
  # Render user history tab
46
  gr.Markdown(
47
  "## Your past generations\n\n(Log in to keep a gallery of your previous generations."
48
  " Your history will be saved and available on your next visit.)"
49
  )
 
 
 
 
 
 
 
 
50
  with gr.Row():
51
  gr.LoginButton(min_width=250)
52
  gr.LogoutButton(min_width=250)
@@ -99,6 +94,9 @@ def render() -> None:
99
  queue=False,
100
  )
101
 
 
 
 
102
 
103
  def save_image(
104
  profile: gr.OAuthProfile | None,
@@ -142,10 +140,7 @@ def save_image(
142
  class _UserHistory(object):
143
  _instance = None
144
  initialized: bool = False
145
-
146
- folder_path: Path | None
147
- delete_button: bool
148
- export_button: bool
149
 
150
  def __new__(cls):
151
  # Using singleton pattern => we don't want to expose an object (more complex to use) but still want to keep
@@ -155,16 +150,12 @@ class _UserHistory(object):
155
  return cls._instance
156
 
157
  def _user_path(self, username: str) -> Path:
158
- if self.folder_path is None:
159
- raise Exception("User history is deactivated.")
160
  path = self.folder_path / username
161
  path.mkdir(parents=True, exist_ok=True)
162
  return path
163
 
164
  def _user_lock(self, username: str) -> FileLock:
165
  """Ensure history is not corrupted if concurrent calls."""
166
- if self.folder_path is None:
167
- raise Exception("User history is deactivated.")
168
  return FileLock(self.folder_path / f"{username}.lock") # lock outside of folder => better when exporting ZIP
169
 
170
  def _user_jsonl_path(self, username: str) -> Path:
@@ -265,22 +256,137 @@ def _copy_image(image: Image | np.ndarray | str | Path, dst_folder: Path) -> Pat
265
  raise ValueError(f"Unsupported image type: {type(image)}")
266
 
267
 
268
- def _resolve_folder_path(folder_path: str | Path | None) -> Path | None:
269
  if folder_path is not None:
270
  return Path(folder_path).expanduser().resolve()
271
 
272
- if os.getenv("SYSTEM") == "spaces":
273
- if os.path.exists("/data"): # Persistent storage is enabled!
274
- return Path("/data") / "user_history"
275
- else:
276
- return None # No persistent storage => no user history
277
 
278
- # Not in a Space => local folder
279
- return Path(__file__).parent / "user_history"
280
 
281
 
282
  def _archives_path() -> Path:
283
  # Doesn't have to be on persistent storage as it's only used for download
284
- path = Path(__file__).parent / "_history_snapshots"
285
  path.mkdir(parents=True, exist_ok=True)
286
  return path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import shutil
4
  import warnings
5
  from datetime import datetime
6
+ from functools import cache
7
  from pathlib import Path
8
+ from typing import Callable, Dict, List, Tuple
9
  from uuid import uuid4
10
 
11
  import gradio as gr
12
  import numpy as np
13
+ import requests
14
  from filelock import FileLock
15
  from PIL.Image import Image
16
 
17
 
18
+ def setup(folder_path: str | Path | None = None) -> None:
 
 
 
 
19
  user_history = _UserHistory()
20
  user_history.folder_path = _resolve_folder_path(folder_path)
 
 
21
  user_history.initialized = True
22
 
23
 
 
29
  print("Initializing user history with default config. Use `user_history.setup(...)` to customize.")
30
  setup()
31
 
 
 
 
 
 
 
 
 
 
32
  # Render user history tab
33
  gr.Markdown(
34
  "## Your past generations\n\n(Log in to keep a gallery of your previous generations."
35
  " Your history will be saved and available on your next visit.)"
36
  )
37
+
38
+ if os.getenv("SYSTEM") == "spaces" and not os.path.exists("/data"):
39
+ gr.Markdown(
40
+ "**⚠️ Persistent storage is disabled, meaning your history will be lost if the Space gets restarted."
41
+ " Only the Space owner can setup a Persistent Storage. If you are not the Space owner, consider"
42
+ " duplicating this Space to set your own storage.⚠️**"
43
+ )
44
+
45
  with gr.Row():
46
  gr.LoginButton(min_width=250)
47
  gr.LogoutButton(min_width=250)
 
94
  queue=False,
95
  )
96
 
97
+ # Admin section (only shown locally or when logged in as Space owner)
98
+ _admin_section()
99
+
100
 
101
  def save_image(
102
  profile: gr.OAuthProfile | None,
 
140
  class _UserHistory(object):
141
  _instance = None
142
  initialized: bool = False
143
+ folder_path: Path
 
 
 
144
 
145
  def __new__(cls):
146
  # Using singleton pattern => we don't want to expose an object (more complex to use) but still want to keep
 
150
  return cls._instance
151
 
152
  def _user_path(self, username: str) -> Path:
 
 
153
  path = self.folder_path / username
154
  path.mkdir(parents=True, exist_ok=True)
155
  return path
156
 
157
  def _user_lock(self, username: str) -> FileLock:
158
  """Ensure history is not corrupted if concurrent calls."""
 
 
159
  return FileLock(self.folder_path / f"{username}.lock") # lock outside of folder => better when exporting ZIP
160
 
161
  def _user_jsonl_path(self, username: str) -> Path:
 
256
  raise ValueError(f"Unsupported image type: {type(image)}")
257
 
258
 
259
+ def _resolve_folder_path(folder_path: str | Path | None) -> Path:
260
  if folder_path is not None:
261
  return Path(folder_path).expanduser().resolve()
262
 
263
+ if os.getenv("SYSTEM") == "spaces" and os.path.exists("/data"): # Persistent storage is enabled!
264
+ return Path("/data") / "_user_history"
 
 
 
265
 
266
+ # Not in a Space or Persistent storage not enabled => local folder
267
+ return Path(__file__).parent / "_user_history"
268
 
269
 
270
  def _archives_path() -> Path:
271
  # Doesn't have to be on persistent storage as it's only used for download
272
+ path = Path(__file__).parent / "_user_history_exports"
273
  path.mkdir(parents=True, exist_ok=True)
274
  return path
275
+
276
+
277
+ #################
278
+ # Admin section #
279
+ #################
280
+
281
+
282
+ def _admin_section() -> None:
283
+ title = gr.Markdown()
284
+ title.attach_load_event(_display_if_admin(), every=None)
285
+
286
+
287
+ def _display_if_admin() -> Callable:
288
+ def _inner(profile: gr.OAuthProfile | None) -> str:
289
+ if profile is None:
290
+ return ""
291
+ if profile["preferred_username"] in _fetch_admins():
292
+ return _admin_content()
293
+ return ""
294
+
295
+ return _inner
296
+
297
+
298
+ def _admin_content() -> str:
299
+ return f"""
300
+ ## Admin section
301
+
302
+ Running on **{os.getenv("SYSTEM", "local")}** (id: {os.getenv("SPACE_ID")}). {_get_msg_is_persistent_storage_enabled()}
303
+
304
+ Admins: {', '.join(_fetch_admins())}
305
+
306
+ {_get_nb_users()} user(s), {_get_nb_images()} image(s)
307
+
308
+ ### Configuration
309
+
310
+ History folder: *{_UserHistory().folder_path}*
311
+
312
+ Exports folder: *{_archives_path()}*
313
+
314
+ ### Disk usage
315
+
316
+ {_disk_space_warning_message()}
317
+ """
318
+
319
+
320
+ def _get_nb_users() -> int:
321
+ user_history = _UserHistory()
322
+ if not user_history.initialized:
323
+ return 0
324
+ if user_history.folder_path is not None:
325
+ return len([path for path in user_history.folder_path.iterdir() if path.is_dir()])
326
+ return 0
327
+
328
+
329
+ def _get_nb_images() -> int:
330
+ user_history = _UserHistory()
331
+ if not user_history.initialized:
332
+ return 0
333
+ if user_history.folder_path is not None:
334
+ return len([path for path in user_history.folder_path.glob("*/images/*")])
335
+ return 0
336
+
337
+
338
+ def _get_msg_is_persistent_storage_enabled() -> str:
339
+ if os.getenv("SYSTEM") == "spaces":
340
+ if os.path.exists("/data"):
341
+ return "Persistent storage is enabled."
342
+ else:
343
+ return (
344
+ "Persistent storage is not enabled. This means that user histories will be deleted when the Space is"
345
+ " restarted. Consider adding a Persistent Storage in your Space settings."
346
+ )
347
+ return ""
348
+
349
+
350
+ def _disk_space_warning_message() -> str:
351
+ user_history = _UserHistory()
352
+ if not user_history.initialized:
353
+ return ""
354
+
355
+ message = ""
356
+ if user_history.folder_path is not None:
357
+ total, used, _ = _get_disk_usage(user_history.folder_path)
358
+ message += f"History folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
359
+
360
+ total, used, _ = _get_disk_usage(_archives_path())
361
+ message += f"\n\nExports folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
362
+
363
+ return f"{message.strip()}"
364
+
365
+
366
+ def _get_disk_usage(path: Path) -> Tuple[int, int, int]:
367
+ for path in [path] + list(path.parents): # first check target_dir, then each parents one by one
368
+ try:
369
+ return shutil.disk_usage(path)
370
+ except OSError: # if doesn't exist or can't read => fail silently and try parent one
371
+ pass
372
+ return 0, 0, 0
373
+
374
+
375
+ @cache
376
+ def _fetch_admins() -> List[str]:
377
+ # Running locally => fake user is admin
378
+ if os.getenv("SYSTEM") != "spaces":
379
+ return ["FakeGradioUser"]
380
+
381
+ # Running in Space but no space_id => ???
382
+ space_id = os.getenv("SPACE_ID")
383
+ if space_id is None:
384
+ return ["Unknown"]
385
+
386
+ # Running in Space => try to fetch organization members
387
+ # Otherwise, it's not an organization => namespace is the user
388
+ namespace = space_id.split("/")[0]
389
+ response = requests.get("https://huggingface.co/api/organizations/{namespace}/members")
390
+ if response.status_code == 200:
391
+ return sorted(member["user"] for member in response.json())
392
+ return [namespace]