witness-fracture/.venv/lib/python3.12/site-packages/internetarchive/iarequest.py
2025-06-23 17:55:02 -05:00

501 lines
17 KiB
Python

#
# The internetarchive module is a Python/CLI interface to Archive.org.
#
# Copyright (C) 2012-2024 Internet Archive
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
internetarchive.iarequest
~~~~~~~~~~~~~~~~~~~~~~~~~
:copyright: (C) 2012-2025 by Internet Archive.
:license: AGPL 3, see LICENSE for more details.
"""
import copy
import logging
import re
from urllib.parse import quote
import requests
import requests.models
from jsonpatch import make_patch
from internetarchive import __version__, auth
from internetarchive.exceptions import ItemLocateError
from internetarchive.utils import delete_items_from_dict, json, needs_quote
logger = logging.getLogger(__name__)
class S3Request(requests.models.Request):
def __init__(self,
metadata=None,
file_metadata=None,
queue_derive=True,
access_key=None,
secret_key=None,
set_scanner=True,
**kwargs):
super().__init__(**kwargs)
self.auth = self.auth or auth.S3Auth(access_key, secret_key)
self.metadata = metadata or {}
self.file_metadata = file_metadata or {}
self.queue_derive = queue_derive
self.set_scanner = set_scanner
def prepare(self):
p = S3PreparedRequest()
p.prepare(
method=self.method,
url=self.url,
headers=self.headers,
files=self.files,
data=self.data,
params=self.params,
auth=self.auth,
cookies=self.cookies,
hooks=self.hooks,
# S3Request kwargs.
metadata=self.metadata,
file_metadata=self.file_metadata,
queue_derive=self.queue_derive,
set_scanner=self.set_scanner,
)
return p
class S3PreparedRequest(requests.models.PreparedRequest):
def prepare(self, method=None, url=None, headers=None, files=None, data=None,
params=None, auth=None, cookies=None, hooks=None, queue_derive=None,
metadata=None, file_metadata=None, set_scanner=None):
self.prepare_method(method)
self.prepare_url(url, params)
self.prepare_headers(headers, metadata, file_metadata, queue_derive, set_scanner)
self.prepare_cookies(cookies)
self.prepare_body(data, files)
self.prepare_auth(auth, url)
# Note that prepare_auth must be last to enable authentication schemes
# such as OAuth to work on a fully prepared request.
# This MUST go after prepare_auth. Authenticators could add a hook
self.prepare_hooks(hooks)
def prepare_headers(self, headers, metadata, file_metadata, queue_derive,
set_scanner):
headers = headers.copy() if headers else {}
metadata = metadata.copy() if metadata else {}
file_metadata = file_metadata.copy() if file_metadata else {}
if set_scanner:
scanner_value = f'Internet Archive Python library {__version__}'
existing_scanner = metadata.get('scanner', [])
if not isinstance(existing_scanner, list):
existing_scanner = [existing_scanner]
existing_scanner.append(scanner_value)
metadata['scanner'] = existing_scanner
prepared_metadata = prepare_metadata(metadata)
prepared_file_metadata = prepare_metadata(file_metadata)
headers.setdefault('x-archive-auto-make-bucket', '1')
headers['x-archive-queue-derive'] = '0' if queue_derive is False else '1'
self._add_metadata_headers(headers, prepared_metadata, 'meta')
self._add_metadata_headers(headers, prepared_file_metadata, 'filemeta')
super().prepare_headers(headers)
def _add_metadata_headers(self, headers, prepared_metadata, meta_type):
for key, values in prepared_metadata.items():
if not isinstance(values, list):
values = [values]
for idx, value in enumerate(values):
if not value:
continue
header_key = f'x-archive-{meta_type}{idx:02d}-{key}'.replace('_', '--')
if isinstance(value, str) and needs_quote(value):
value = f'uri({quote(value)})'
headers[header_key] = value
class MetadataRequest(requests.models.Request):
def __init__(self,
metadata=None,
source_metadata=None,
target=None,
priority=None,
access_key=None,
secret_key=None,
append=None,
expect=None,
append_list=None,
insert=None,
reduced_priority=None,
**kwargs):
super().__init__(**kwargs)
self.auth = self.auth or auth.S3PostAuth(access_key, secret_key)
self.metadata = metadata or {}
self.source_metadata = source_metadata
self.target = target
self.priority = priority
self.append = append
self.expect = expect or {}
self.append_list = append_list
self.insert = insert
self.reduced_priority = reduced_priority
def prepare(self):
p = MetadataPreparedRequest()
p.prepare(
method=self.method,
url=self.url,
headers=self.headers,
files=self.files,
data=self.data,
params=self.params,
auth=self.auth,
cookies=self.cookies,
hooks=self.hooks,
# MetadataRequest kwargs.
metadata=self.metadata,
priority=self.priority,
source_metadata=self.source_metadata,
target=self.target,
append=self.append,
expect=self.expect,
append_list=self.append_list,
insert=self.insert,
reduced_priority=self.reduced_priority,
)
return p
class MetadataPreparedRequest(requests.models.PreparedRequest):
def prepare(self, method=None, url=None, headers=None, files=None, data=None,
params=None, auth=None, cookies=None, hooks=None, metadata=None,
source_metadata=None, target=None, priority=None, append=None,
expect=None, append_list=None, insert=None, reduced_priority=None):
# First handle our custom headers
if reduced_priority:
headers = headers.copy() if headers else {}
headers['X-Accept-Reduced-Priority'] = '1'
# Now run full parent preparation
super().prepare(
method=method,
url=url,
headers=headers,
files=files,
data=data,
params=params,
auth=auth,
cookies=cookies,
hooks=hooks,
)
# Now add our custom handling
self.identifier = self.url.split('?')[0].split('/')[-1]
self._prepare_request_body(
metadata,
source_metadata,
target,
priority,
append,
append_list,
insert,
expect,
)
self.prepare_auth(auth, url)
# Note that prepare_auth must be last to enable authentication schemes
# such as OAuth to work on a fully prepared request.
# This MUST go after prepare_auth. Authenticators could add a hook
self.prepare_hooks(hooks)
def _prepare_request_body(self, metadata, source_metadata, target, priority,
append, append_list, insert, expect):
if not source_metadata:
r = requests.get(self.url, timeout=10)
source_metadata = r.json()
if self._is_multi_target(metadata):
changes = self._prepare_multi_target_changes(
metadata,
source_metadata,
target,
append,
expect,
append_list,
insert,
)
self.data = {'-changes': json.dumps(changes), 'priority': priority or -5}
else:
self._prepare_single_target_body(
metadata,
source_metadata,
target,
append,
append_list,
insert,
expect,
priority,
)
logger.debug(f'submitting metadata request: {self.data}')
super().prepare_body(self.data, None)
def _is_multi_target(self, metadata):
return (
isinstance(metadata, list)
or any('/' in k for k in metadata)
or all(isinstance(v, dict) for v in metadata.values())
)
def _prepare_multi_target_changes(self, metadata, source_metadata, target,
append, expect, append_list, insert):
changes = []
if target:
metadata = {target: metadata}
for key in metadata:
patch = self._get_patch_for_target(
key,
metadata[key],
source_metadata,
append,
expect,
append_list,
insert,
)
changes.append({'target': key, 'patch': patch})
return changes
def _prepare_single_target_body(self, metadata, source_metadata, target, append,
append_list, insert, expect, priority):
target = target or 'metadata'
if target == 'metadata':
try:
patch = prepare_patch(
metadata,
source_metadata['metadata'],
append,
expect,
append_list,
insert,
)
except KeyError:
raise ItemLocateError(
f'{self.identifier} cannot be located '
'because it is dark or does not exist.'
)
elif target.startswith('files/'):
patch = prepare_files_patch(
metadata,
source_metadata['files'],
target,
append,
append_list,
insert,
expect,
)
else:
patch = prepare_target_patch(
{target: metadata},
source_metadata,
append,
target,
append_list,
target,
insert,
expect,
)
self.data = {
'-patch': json.dumps(patch),
'-target': target,
'priority': priority or -5,
}
def prepare_patch(metadata, source_metadata, append, expect=None,
append_list=None, insert=None):
destination = source_metadata.copy()
if isinstance(metadata, list):
prepared_metadata = metadata
if not destination:
destination = []
else:
prepared_metadata = prepare_metadata(
metadata,
source_metadata,
append,
append_list,
insert,
)
if isinstance(destination, dict):
destination.update(prepared_metadata)
elif isinstance(metadata, list):
destination = prepared_metadata if not destination else prepared_metadata
else:
if isinstance(prepared_metadata, list):
destination = prepared_metadata
else:
destination = [prepared_metadata]
destination = delete_items_from_dict(destination, 'REMOVE_TAG')
patch = make_patch(source_metadata, destination).patch
patch_tests = _create_patch_tests(expect)
return patch_tests + patch
def _create_patch_tests(expect):
tests = []
for key, value in (expect or {}).items():
if '[' in key:
parts = key.split('[')
idx = int(parts[1].strip(']'))
path = f'/{parts[0]}/{idx}'
else:
path = f'/{key}'
tests.append({'op': 'test', 'path': path, 'value': value})
return tests
def prepare_target_patch(metadata, source_metadata, append, target,
append_list, key, insert, expect):
nested_dict = _create_nested_dict(metadata)
current = source_metadata
for part in key.split('/'):
current = current.get(part, {})
patch = prepare_patch(nested_dict, current, append, expect, append_list, insert)
return patch
def _create_nested_dict(metadata):
nested = {}
for key_path, value in metadata.items():
parts = key_path.split('/')
current = nested
for part in parts[:-1]:
current = current.setdefault(part, {})
current[parts[-1]] = value
return nested
def prepare_files_patch(metadata, files_metadata, target, append,
append_list, insert, expect):
filename = target.split('/')[1]
for file_meta in files_metadata:
if file_meta.get('name') == filename:
return prepare_patch(
metadata,
file_meta,
append,
expect,
append_list,
insert,
)
return []
def prepare_metadata(metadata, source_metadata=None, append=False,
append_list=False, insert=False):
source = copy.deepcopy(source_metadata) if source_metadata else {}
prepared = {}
indexed_keys = _process_indexed_keys(metadata, source, prepared)
_process_non_indexed_keys(metadata, source, prepared, append, append_list, insert)
_cleanup_indexed_keys(prepared, indexed_keys, metadata)
return prepared
def _process_non_indexed_keys(metadata, source, prepared, append, append_list, insert):
for key, value in metadata.items():
current_key = key
if isinstance(value, (int, float, complex)) and not isinstance(value, bool):
value = str(value)
if append_list and source.get(current_key):
existing = source[current_key]
if not isinstance(existing, list):
existing = [existing]
prepared[current_key] = existing + [value]
elif append and source.get(current_key):
prepared[current_key] = f'{source[current_key]} {value}'
elif insert and source.get(current_key):
existing = source[current_key]
if not isinstance(existing, list):
existing = [existing]
existing.insert(0, value)
prepared[current_key] = [v for v in existing if v]
else:
prepared[current_key] = value
def _cleanup_indexed_keys(prepared, indexed_keys, metadata):
for base in indexed_keys:
if base in prepared:
prepared[base] = [v for v in prepared[base] if v is not None]
indexes = [
i for i, k in enumerate(metadata)
if _get_base_key(k) == base and metadata[k] == 'REMOVE_TAG'
]
for i in reversed(indexes):
if i < len(prepared[base]):
del prepared[base][i]
def _process_indexed_keys(metadata, source, prepared):
indexed_keys = {}
for key in list(metadata.keys()):
if _is_indexed_key(key):
base = _get_base_key(key)
idx = _get_index(key)
if base not in indexed_keys:
source_list = source.get(base, [])
if not isinstance(source_list, list):
source_list = [source_list]
indexed_keys[base] = len(source_list)
current_metadata_length = len(metadata)
prepared[base] = source_list + [None] * (
current_metadata_length - len(source_list)
)
while len(prepared[base]) <= idx:
prepared[base].append(None)
prepared[base][idx] = metadata[key]
del metadata[key]
return indexed_keys
def _get_base_key(key):
return key.split('[')[0]
def _is_indexed_key(key):
return '[' in key and ']' in key
def _get_index(key):
match = re.search(r'(?<=\[)\d+(?=\])', key)
return int(match.group()) if match else None