|
7 | 7 |
|
8 | 8 | from h import storage
|
9 | 9 | from h.search.util import add_default_scheme, wildcard_uri_is_valid
|
| 10 | +from h.security.permissions import Permission |
10 | 11 | from h.util import uri
|
11 | 12 |
|
12 | 13 | LIMIT_DEFAULT = 20
|
@@ -207,23 +208,62 @@ def __call__(self, search, params): # noqa: ARG002
|
207 | 208 | return search.filter("term", shared=True)
|
208 | 209 |
|
209 | 210 |
|
210 |
| -class GroupFilter: |
211 |
| - """ |
212 |
| - Filter that limits which groups annotations are returned from. |
213 |
| -
|
214 |
| - This excludes annotations from groups that the user is not authorized to |
215 |
| - read or which are explicitly excluded by the search query. |
216 |
| - """ |
217 |
| - |
| 211 | +class GroupAndModerationFilter: |
218 | 212 | def __init__(self, request):
|
219 | 213 | self.user = request.user
|
220 | 214 | self.group_service = request.find_service(name="group")
|
221 | 215 |
|
222 | 216 | def __call__(self, search, params):
|
223 |
| - # Remove parameter if passed, preventing it being passed to default query |
224 | 217 | group_ids = popall(params, "group") or None
|
225 |
| - groups = self.group_service.groupids_readable_by(self.user, group_ids) |
226 |
| - return search.filter("terms", group=groups) |
| 218 | + groups = self.group_service.groups_readable_by(self.user, group_ids) |
| 219 | + |
| 220 | + not_nipsa = ~Q("term", nipsa=True) |
| 221 | + not_hidden = ~Q("term", hidden=True) |
| 222 | + |
| 223 | + if not self.user: |
| 224 | + # If there not a logged in user |
| 225 | + if groups: |
| 226 | + # Apply the group filter as it is |
| 227 | + search = search.filter("terms", group=[g.pubid for g in groups]) |
| 228 | + # And don't show any hidden or NIPSA'd annotations |
| 229 | + return search.filter(not_nipsa & not_hidden) |
| 230 | + |
| 231 | + if not groups: |
| 232 | + # If the user is logged in, hide hidden annos and NIPSA'd annos except the ones authored by the user |
| 233 | + return search.filter( |
| 234 | + (not_hidden & not_nipsa) | Q("term", user=self.user.userid.lower()) |
| 235 | + ) |
| 236 | + |
| 237 | + query_clauses = [] |
| 238 | + |
| 239 | + from h.security import Identity, identity_permits |
| 240 | + from h.traversal import GroupContext |
| 241 | + |
| 242 | + # If t he user is logged in and we are filtering by groups |
| 243 | + # we'll check for each group if we are a moderator |
| 244 | + for group in groups: |
| 245 | + user_is_moderator = identity_permits( |
| 246 | + identity=Identity.from_models(user=self.user), |
| 247 | + context=GroupContext(group), |
| 248 | + permission=Permission.Group.MODERATE, |
| 249 | + ) |
| 250 | + if user_is_moderator: |
| 251 | + # For modereators: |
| 252 | + # - We show all their annos |
| 253 | + # - We don't filter out hideden annos |
| 254 | + # - We do hide NIPSA'd annos |
| 255 | + query_clauses = Q("term", group=group.pubid) & not_nipsa |
| 256 | + |
| 257 | + else: |
| 258 | + # For non moderators: |
| 259 | + # - We show all their annos |
| 260 | + # - We hide hidden annos |
| 261 | + # - We hide NIPSA'd annos |
| 262 | + query_clauses = Q("term", group=group.pubid) & ( |
| 263 | + not_hidden & not_nipsa |
| 264 | + ) | Q("term", user=self.user.userid.lower()) |
| 265 | + |
| 266 | + return search.filter(Q("bool", should=query_clauses)) |
227 | 267 |
|
228 | 268 |
|
229 | 269 | class UriCombinedWildcardFilter:
|
@@ -347,29 +387,6 @@ def __call__(self, search, _):
|
347 | 387 | return search.exclude("exists", field="deleted")
|
348 | 388 |
|
349 | 389 |
|
350 |
| -class HiddenFilter: |
351 |
| - """Return an Elasticsearch filter for filtering out moderated or NIPSA'd annotations.""" |
352 |
| - |
353 |
| - def __init__(self, request): |
354 |
| - self.group_service = request.find_service(name="group") |
355 |
| - self.user = request.user |
356 |
| - |
357 |
| - def __call__(self, search, _): |
358 |
| - """Filter out all hidden and NIPSA'd annotations except the current user's.""" |
359 |
| - # If any one of these "should" clauses is true then the annotation will |
360 |
| - # get through the filter. |
361 |
| - should_clauses = [ |
362 |
| - Q("bool", must_not=[Q("term", nipsa=True), Q("term", hidden=True)]) |
363 |
| - ] |
364 |
| - |
365 |
| - if self.user is not None: |
366 |
| - # Always show the logged-in user's annotations even if they have |
367 |
| - # been hidden or the user has been NIPSA'd |
368 |
| - should_clauses.append(Q("term", user=self.user.userid.lower())) |
369 |
| - |
370 |
| - return search.filter(Q("bool", should=should_clauses)) |
371 |
| - |
372 |
| - |
373 | 390 | class AnyMatcher:
|
374 | 391 | """Match the contents of a selection of fields against the `any` parameter."""
|
375 | 392 |
|
|
0 commit comments