# # The internetarchive module is a Python/CLI interface to Archive.org. # # Copyright (C) 2012-2024 Internet Archive # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ internetarchive.item ~~~~~~~~~~~~~~~~~~~~ :copyright: (C) 2012-2024 by Internet Archive. :license: AGPL 3, see LICENSE for more details. """ from __future__ import annotations import io import math import os import sys from copy import deepcopy from fnmatch import fnmatch from functools import total_ordering from logging import getLogger from time import sleep from typing import Mapping, MutableMapping, Optional from urllib.parse import quote from xml.parsers.expat import ExpatError from requests import Request, Response from requests.exceptions import HTTPError from tqdm import tqdm from internetarchive import catalog from internetarchive.auth import S3Auth from internetarchive.files import File from internetarchive.iarequest import MetadataRequest, S3Request from internetarchive.utils import ( IdentifierListAsItems, IterableToFileAdapter, chunk_generator, get_file_size, get_md5, get_s3_xml_text, is_dir, iter_directory, json, norm_filepath, recursive_file_count_and_size, validate_s3_identifier, ) log = getLogger(__name__) @total_ordering class BaseItem: EXCLUDED_ITEM_METADATA_KEYS = ('workable_servers', 'server') def __init__( self, identifier: str | None = None, item_metadata: Mapping | None = None, ): # Default attributes. self.identifier = identifier self.item_metadata = item_metadata or {} self.exists = False # Archive.org metadata attributes. self.metadata: dict = {} self.files: list[dict] = [] self.created = None self.d1 = None self.d2 = None self.dir = None self.files_count = None self.item_size = None self.reviews: list = [] self.server = None self.uniq = None self.updated = None self.tasks = None self.is_dark = None # Load item. self.load() def __repr__(self) -> str: notloaded = ', item_metadata={}' if not self.exists else '' return f'{self.__class__.__name__}(identifier={self.identifier!r}{notloaded})' def load(self, item_metadata: Mapping | None = None) -> None: if item_metadata: self.item_metadata = item_metadata self.exists = bool(self.item_metadata) for key in self.item_metadata: setattr(self, key, self.item_metadata[key]) if not self.identifier: self.identifier = self.metadata.get('identifier') mc = self.metadata.get('collection', []) # TODO: The `type: ignore` on the following line should be removed. See #518 self.collection = IdentifierListAsItems(mc, self.session) # type: ignore def __eq__(self, other) -> bool: return (self.item_metadata == other.item_metadata or (self.item_metadata.keys() == other.item_metadata.keys() and all(self.item_metadata[x] == other.item_metadata[x] for x in self.item_metadata if x not in self.EXCLUDED_ITEM_METADATA_KEYS))) def __le__(self, other) -> bool: return self.identifier <= other.identifier def __hash__(self) -> int: without_excluded_keys = { k: v for k, v in self.item_metadata.items() if k not in self.EXCLUDED_ITEM_METADATA_KEYS} return hash(json.dumps(without_excluded_keys, sort_keys=True, check_circular=False)) # type: ignore class Item(BaseItem): """This class represents an archive.org item. Generally this class should not be used directly, but rather via the ``internetarchive.get_item()`` function:: >>> from internetarchive import get_item >>> item = get_item('stairs') >>> print(item.metadata) Or to modify the metadata for an item:: >>> metadata = {'title': 'The Stairs'} >>> item.modify_metadata(metadata) >>> print(item.metadata['title']) 'The Stairs' This class also uses IA's S3-like interface to upload files to an item. You need to supply your IAS3 credentials in environment variables in order to upload:: >>> item.upload('myfile.tar', access_key='Y6oUrAcCEs4sK8ey', ... secret_key='youRSECRETKEYzZzZ') True You can retrieve S3 keys here: `https://archive.org/account/s3.php `__ """ def __init__( self, archive_session, identifier: str, item_metadata: Mapping | None = None, ): """ :param archive_session: :class:`ArchiveSession ` :param identifier: The globally unique Archive.org identifier for this item. An identifier is composed of any unique combination of alphanumeric characters, underscore ( _ ) and dash ( - ). While there are no official limits it is strongly suggested that they be between 5 and 80 characters in length. Identifiers must be unique across the entirety of Internet Archive, not simply unique within a single collection. Once defined an identifier can not be changed. It will travel with the item or object and is involved in every manner of accessing or referring to the item. :param item_metadata: The Archive.org item metadata used to initialize this item. If no item metadata is provided, it will be retrieved from Archive.org using the provided identifier. """ self.session = archive_session super().__init__(identifier, item_metadata) self.urls = Item.URLs(self) if self.metadata.get('title'): # A copyable link to the item, in MediaWiki format details = self.urls.details # type: ignore self.wikilink = f'* [{details} {self.identifier}] -- {self.metadata["title"]}' class URLs: def __init__(self, itm_obj): self._itm_obj = itm_obj self._paths = [] self._make_URL('details') self._make_URL('metadata') self._make_URL('download') self._make_URL('history') self._make_URL('edit') self._make_URL('editxml') self._make_URL('manage') if self._itm_obj.metadata.get('mediatype') == 'collection': self._make_tab_URL('about') self._make_tab_URL('collection') def _make_tab_URL(self, tab: str) -> None: """Make URLs for the separate tabs of Collections details page.""" self._make_URL(tab, self.details + f'&tab={tab}') # type: ignore DEFAULT_URL_FORMAT = ('{0.session.protocol}//{0.session.host}' '/{path}/{0.identifier}') def _make_URL(self, path: str, url_format: str = DEFAULT_URL_FORMAT) -> None: setattr(self, path, url_format.format(self._itm_obj, path=path)) self._paths.append(path) def __str__(self) -> str: return f'URLs ({", ".join(self._paths)}) for {self._itm_obj.identifier}' def refresh(self, item_metadata: Mapping | None = None, **kwargs) -> None: if not item_metadata: item_metadata = self.session.get_metadata(self.identifier, **kwargs) self.load(item_metadata) def identifier_available(self) -> bool: """Check if the item identifier is available for creating a new item. :return: `True` if identifier is available, or `False` if it is not available. """ url = f'{self.session.protocol}//{self.session.host}/services/check_identifier.php' params = {'output': 'json', 'identifier': self.identifier} response = self.session.get(url, params=params) availability = response.json()['code'] return availability == 'available' def get_task_summary( self, params: Mapping | None = None, request_kwargs: Mapping | None = None, ) -> dict: """Get a summary of the item's pending tasks. :param params: Params to send with your request. :returns: A summary of the item's pending tasks. """ return self.session.get_tasks_summary(self.identifier, params, request_kwargs) def no_tasks_pending( self, params: Mapping | None = None, request_kwargs: Mapping | None = None, ) -> bool: """Check if there is any pending task for the item. :param params: Params to send with your request. :returns: `True` if no tasks are pending, otherwise `False`. """ return all(x == 0 for x in self.get_task_summary(params, request_kwargs).values()) def get_all_item_tasks( self, params: dict | None = None, request_kwargs: Mapping | None = None, ) -> list[catalog.CatalogTask]: """Get a list of all tasks for the item, pending and complete. :param params: Query parameters, refer to `Tasks API `_ for available parameters. :param request_kwargs: Keyword arguments that :py:func:`requests.get` takes. :returns: A list of all tasks for the item, pending and complete. """ params = params or {} params.update({'catalog': 1, 'history': 1}) return self.session.get_tasks(self.identifier, params, request_kwargs) def get_history( self, params: Mapping | None = None, request_kwargs: Mapping | None = None, ) -> list[catalog.CatalogTask]: """Get a list of completed catalog tasks for the item. :param params: Params to send with your request. :returns: A list of completed catalog tasks for the item. """ return list(self.session.iter_history(self.identifier, params, request_kwargs)) def get_catalog( self, params: Mapping | None = None, request_kwargs: Mapping | None = None, ) -> list[catalog.CatalogTask]: """Get a list of pending catalog tasks for the item. :param params: Params to send with your request. :returns: A list of pending catalog tasks for the item. """ return list(self.session.iter_catalog(self.identifier, params, request_kwargs)) def derive(self, priority: int = 0, remove_derived: str | None = None, reduced_priority: bool = False, data: MutableMapping | None = None, headers: Mapping | None = None, request_kwargs: Mapping | None = None) -> Response: """Derive an item. :param priority: Task priority from 10 to -10 [default: 0] :param remove_derived: You can use wildcards ("globs") to only remove *some* prior derivatives. For example, "*" (typed without the quotation marks) specifies that all derivatives (in the item's top directory) are to be rebuilt. "*.mp4" specifies that all "*.mp4" deriviatives are to be rebuilt. "{*.gif,*thumbs/*.jpg}" specifies that all GIF and thumbs are to be rebuilt. :param reduced_priority: Submit your derive at a lower priority. This option is helpful to get around rate-limiting. Your task will more likely be accepted, but it might not run for a long time. Note that you still may be subject to rate-limiting. :returns: :class:`requests.Response` """ data = data or {} if remove_derived is not None: if not data.get('args'): data['args'] = {'remove_derived': remove_derived} else: data['args'].update({'remove_derived': remove_derived}) r = self.session.submit_task(self.identifier, 'derive.php', priority=priority, data=data, headers=headers, reduced_priority=reduced_priority, request_kwargs=request_kwargs) r.raise_for_status() return r def fixer(self, ops: list | str | None = None, priority: int | str | None = None, reduced_priority: bool = False, data: MutableMapping | None = None, headers: Mapping | None = None, request_kwargs: Mapping | None = None) -> Response: """Submit a fixer task on an item. :param ops: The fixer operation(s) to run on the item [default: noop]. :param priority: The task priority. :param reduced_priority: Submit your derive at a lower priority. This option is helpful to get around rate-limiting. Your task will more likely be accepted, but it might not run for a long time. Note that you still may be subject to rate-limiting. This is different than ``priority`` in that it will allow you to possibly avoid rate-limiting. :param data: Additional parameters to submit with the task. :returns: :class:`requests.Response` """ data = data or {} ops = ops or ['noop'] if not isinstance(ops, (list, tuple, set)): ops = [ops] data['args'] = data.get('args') or {} for op in ops: data['args'][op] = '1' r = self.session.submit_task(self.identifier, 'fixer.php', priority=priority, data=data, headers=headers, reduced_priority=reduced_priority, request_kwargs=request_kwargs) r.raise_for_status() return r def undark(self, comment: str, priority: int | str | None = None, reduced_priority: bool = False, data: Mapping | None = None, request_kwargs: Mapping | None = None) -> Response: """Undark the item. :param comment: The curation comment explaining reason for undarking item :param priority: The task priority. :param reduced_priority: Submit your derive at a lower priority. This option is helpful to get around rate-limiting. Your task will more likely be accepted, but it might not run for a long time. Note that you still may be subject to rate-limiting. This is different than ``priority`` in that it will allow you to possibly avoid rate-limiting. :param data: Additional parameters to submit with the task. :returns: :class:`requests.Response` """ r = self.session.submit_task(self.identifier, 'make_undark.php', comment=comment, priority=priority, data=data, reduced_priority=reduced_priority, request_kwargs=request_kwargs) r.raise_for_status() return r # TODO: dark and undark have different order for data and reduced_pripoity def dark(self, comment: str, priority: int | str | None = None, data: Mapping | None = None, reduced_priority: bool = False, request_kwargs: Mapping | None = None) -> Response: """Dark the item. :param comment: The curation comment explaining reason for darking item :param priority: The task priority. :param reduced_priority: Submit your derive at a lower priority. This option is helpful to get around rate-limiting. Your task will more likely be accepted, but it might not run for a long time. Note that you still may be subject to rate-limiting. This is different than ``priority`` in that it will allow you to possibly avoid rate-limiting. :param data: Additional parameters to submit with the task. :returns: :class:`requests.Response` """ r = self.session.submit_task(self.identifier, 'make_dark.php', comment=comment, priority=priority, data=data, reduced_priority=reduced_priority, request_kwargs=request_kwargs) r.raise_for_status() return r def get_review(self) -> Response: u = f'{self.session.protocol}//{self.session.host}/services/reviews.php' p = {'identifier': self.identifier} a = S3Auth(self.session.access_key, self.session.secret_key) r = self.session.get(u, params=p, auth=a) r.raise_for_status() return r def index_review(self, username=None, screenname=None, itemname=None) -> Response: u = f'{self.session.protocol}//{self.session.host}/services/reviews.php' p = {'identifier': self.identifier} d = {'noindex': '0'} if username: d['username'] = username elif screenname: d['screenname'] = screenname elif itemname: d['itemname'] = itemname a = S3Auth(self.session.access_key, self.session.secret_key) r = self.session.put(u, params=p, data=d, auth=a) r.raise_for_status() return r def noindex_review(self, username=None, screenname=None, itemname=None) -> Response: u = f'{self.session.protocol}//{self.session.host}/services/reviews.php' p = {'identifier': self.identifier} d = {'noindex': '1'} if username: d['username'] = username elif screenname: d['screenname'] = screenname elif itemname: d['itemname'] = itemname a = S3Auth(self.session.access_key, self.session.secret_key) r = self.session.put(u, params=p, data=d, auth=a) r.raise_for_status() return r def delete_review(self, username=None, screenname=None, itemname=None) -> Response: u = f'{self.session.protocol}//{self.session.host}/services/reviews.php' p = {'identifier': self.identifier} d = None if username: d = {'username': username} elif screenname: d = {'screenname': screenname} elif itemname: d = {'itemname': itemname} a = S3Auth(self.session.access_key, self.session.secret_key) r = self.session.delete(u, params=p, data=d, auth=a) r.raise_for_status() return r def review(self, title, body, stars=None) -> Response: u = f'{self.session.protocol}//{self.session.host}/services/reviews.php' p = {'identifier': self.identifier} d = {'title': title, 'body': body} if stars: d['stars'] = stars a = S3Auth(self.session.access_key, self.session.secret_key) r = self.session.post(u, params=p, data=json.dumps(d), auth=a) r.raise_for_status() return r def get_file(self, file_name: str, file_metadata: Mapping | None = None) -> File: """Get a :class:`File ` object for the named file. :param file_metadata: a dict of metadata for the given file. :returns: An :class:`internetarchive.File ` object. """ return File(self, file_name, file_metadata) def get_files(self, files: File | list[File] | None = None, formats: str | list[str] | None = None, glob_pattern: str | list[str] | None = None, exclude_pattern: str | list[str] | None = None, on_the_fly: bool = False): files = files or [] formats = formats or [] exclude_pattern = exclude_pattern or '' on_the_fly = bool(on_the_fly) if not isinstance(files, (list, tuple, set)): files = [files] if not isinstance(formats, (list, tuple, set)): formats = [formats] item_files = deepcopy(self.files) # Add support for on-the-fly files (e.g. EPUB). if on_the_fly: otf_files = [ ('EPUB', f'{self.identifier}.epub'), ('MOBI', f'{self.identifier}.mobi'), ('DAISY', f'{self.identifier}_daisy.zip'), ('MARCXML', f'{self.identifier}_archive_marc.xml'), ] for format, file_name in otf_files: item_files.append({'name': file_name, 'format': format, 'otf': True}) if not any(k for k in [files, formats, glob_pattern]): for f in item_files: yield self.get_file(str(f.get('name')), file_metadata=f) for f in item_files: if f.get('name') in files: yield self.get_file(str(f.get('name'))) elif f.get('format') in formats: yield self.get_file(str(f.get('name'))) elif glob_pattern: if not isinstance(glob_pattern, list): patterns = glob_pattern.split('|') else: patterns = glob_pattern if not isinstance(exclude_pattern, list): exclude_patterns = exclude_pattern.split('|') else: exclude_patterns = exclude_pattern for p in patterns: if fnmatch(f.get('name', ''), p): if not any(fnmatch(f.get('name', ''), e) for e in exclude_patterns): yield self.get_file(str(f.get('name'))) def download(self, files: File | list[File] | None = None, formats: str | list[str] | None = None, glob_pattern: str | None = None, exclude_pattern: str | None = None, dry_run: bool = False, verbose: bool = False, ignore_existing: bool = False, checksum: bool = False, checksum_archive: bool = False, destdir: str | None = None, no_directory: bool = False, retries: int | None = None, item_index: int | None = None, ignore_errors: bool = False, on_the_fly: bool = False, return_responses: bool = False, no_change_timestamp: bool = False, ignore_history_dir: bool = False, source: str | list[str] | None = None, exclude_source: str | list[str] | None = None, stdout: bool = False, params: Mapping | None = None, timeout: float | tuple[int, float] | None = None ) -> list[Request | Response]: """Download files from an item. :param files: Only download files matching given file names. :param formats: Only download files matching the given Formats. :param glob_pattern: Only download files matching the given glob pattern. :param exclude_pattern: Exclude files whose filename matches the given glob pattern. :param dry_run: Output download URLs to stdout, don't download anything. :param verbose: Turn on verbose output. :param ignore_existing: Skip files that already exist locally. :param checksum: Skip downloading file based on checksum. :param checksum_archive: Skip downloading file based on checksum, and skip checksum validation if it already succeeded (will create and use _checksum_archive.txt). :param destdir: The directory to download files to. :param no_directory: Download files to current working directory rather than creating an item directory. :param retries: The number of times to retry on failed requests. :param item_index: The index of the item for displaying progress in bulk downloads. :param ignore_errors: Don't fail if a single file fails to download, continue to download other files. :param on_the_fly: Download on-the-fly files (i.e. derivative EPUB, MOBI, DAISY files). :param return_responses: Rather than downloading files to disk, return a list of response objects. :param no_change_timestamp: If True, leave the time stamp as the current time instead of changing it to that given in the original archive. :param source: Filter files based on their source value in files.xml (i.e. `original`, `derivative`, `metadata`). :param exclude_source: Filter files based on their source value in files.xml (i.e. `original`, `derivative`, `metadata`). :param params: URL parameters to send with download request (e.g. `cnt=0`). :param ignore_history_dir: Do not download any files from the history dir. This param defaults to ``False``. :returns: True if if all files have been downloaded successfully. """ dry_run = bool(dry_run) verbose = bool(verbose) ignore_existing = bool(ignore_existing) ignore_errors = bool(ignore_errors) checksum = bool(checksum) checksum_archive = bool(checksum_archive) no_directory = bool(no_directory) return_responses = bool(return_responses) no_change_timestamp = bool(no_change_timestamp) ignore_history_dir = bool(ignore_history_dir) params = params or None if source: if not isinstance(source, list): source = [source] if exclude_source: if not isinstance(exclude_source, list): exclude_source = [exclude_source] if stdout: fileobj = os.fdopen(sys.stdout.fileno(), "wb", closefd=False) verbose = False else: fileobj = None if not dry_run: if item_index and verbose: print(f'{self.identifier} ({item_index}):', file=sys.stderr) elif item_index is None and verbose: print(f'{self.identifier}:', file=sys.stderr) if self.is_dark: msg = f'skipping {self.identifier}, item is dark' log.warning(msg) if verbose: print(f' {msg}', file=sys.stderr) return [] elif self.metadata == {}: msg = f'skipping {self.identifier}, item does not exist.' log.warning(msg) if verbose: print(f' {msg}', file=sys.stderr) return [] if files: files = self.get_files(files, on_the_fly=on_the_fly) else: files = self.get_files(on_the_fly=on_the_fly) if formats: files = self.get_files(formats=formats, on_the_fly=on_the_fly) if glob_pattern: files = self.get_files( glob_pattern=glob_pattern, exclude_pattern=exclude_pattern, on_the_fly=on_the_fly ) if stdout: files = list(files) # type: ignore errors = [] downloaded = 0 responses = [] file_count = 0 for f in files: # type: ignore if ignore_history_dir is True: if f.name.startswith('history/'): continue if source and not any(f.source == x for x in source): continue if exclude_source and any(f.source == x for x in exclude_source): continue file_count += 1 if no_directory: path = f.name else: path = os.path.join(str(self.identifier), f.name) if dry_run: print(f.url) continue if stdout and file_count < len(files): # type: ignore ors = True else: ors = False r = f.download(path, verbose, ignore_existing, checksum, checksum_archive, destdir, retries, ignore_errors, fileobj, return_responses, no_change_timestamp, params, None, stdout, ors, timeout) if return_responses: responses.append(r) if r is False: errors.append(f.name) else: downloaded += 1 if file_count == 0: msg = f'skipping {self.identifier}, no matching files found.' log.info(msg) if verbose: print(f' {msg}', file=sys.stderr) return [] return responses if return_responses else errors def modify_metadata(self, metadata: Mapping, target: str | None = None, append: bool = False, expect: Mapping | None = None, append_list: bool = False, insert: bool = False, priority: int = 0, access_key: str | None = None, secret_key: str | None = None, debug: bool = False, headers: Mapping | None = None, reduced_priority: bool = False, request_kwargs: Mapping | None = None, timeout: float | None = None, refresh: bool = True) -> Request | Response: """Modify the metadata of an existing item on Archive.org. Note: The Metadata Write API does not yet comply with the latest Json-Patch standard. It currently complies with `version 02 `__. :param metadata: Metadata used to update the item. :param target: Set the metadata target to update. :param priority: Set task priority. :param append: Append value to an existing multi-value metadata field. :param expect: Provide a dict of expectations to be tested server-side before applying patch to item metadata. :param append_list: Append values to an existing multi-value metadata field. No duplicate values will be added. :param refresh: Refresh the item metadata after the request. :param reduced_priority: Submit your task at a lower priority. This option is helpful to get around rate-limiting. Your task will more likely be accepted, but it might not run for a long time. Note that you still may be subject to rate-limiting. :returns: A Request if debug else a Response. Usage:: >>> import internetarchive >>> item = internetarchive.Item('mapi_test_item1') >>> md = {'new_key': 'new_value', 'foo': ['bar', 'bar2']} >>> item.modify_metadata(md) """ append = bool(append) access_key = access_key or self.session.access_key secret_key = secret_key or self.session.secret_key debug = bool(debug) headers = headers or {} expect = expect or {} request_kwargs = request_kwargs or {} if timeout: request_kwargs["timeout"] = float(timeout) # type: ignore else: request_kwargs["timeout"] = 60 # type: ignore _headers = self.session.headers.copy() _headers.update(headers) url = f'{self.session.protocol}//{self.session.host}/metadata/{self.identifier}' # TODO: currently files and metadata targets do not support dict's, # but they might someday?? refactor this check. source_metadata = self.item_metadata request = MetadataRequest( method='POST', url=url, metadata=metadata, headers=_headers, source_metadata=source_metadata, target=target, priority=priority, access_key=access_key, secret_key=secret_key, append=append, expect=expect, append_list=append_list, insert=insert, reduced_priority=reduced_priority) # Must use Session.prepare_request to make sure session settings # are used on request! prepared_request = request.prepare() if debug: return prepared_request resp = self.session.send(prepared_request, **request_kwargs) # Re-initialize the Item object with the updated metadata. if refresh: self.refresh() return resp def delete_flag( self, category: str, user: Optional[str] = None, # noqa: UP007 ) -> Response: if user is None: user = f"@{self.session.config.get('general', {}).get('screenname')}" url = f'{self.session.protocol}//{self.session.host}/services/flags/admin.php' headers = {'Accept': 'text/json'} # must be text/json specifically params = {'identifier': self.identifier, 'category': category, 'user': user} r = self.session.delete(url, headers=headers, params=params) return r def add_flag( self, category: str, user: Optional[str] = None, # noqa: UP007 ) -> Response: if user is None: user = f"@{self.session.config.get('general', {}).get('screenname')}" url = f'{self.session.protocol}//{self.session.host}/services/flags/admin.php' headers = {'Accept': 'text/json'} # must be text/json specifically params = {'identifier': self.identifier, 'category': category, 'user': user} r = self.session.put(url, headers=headers, params=params) return r def get_flags(self) -> Response: url = f'{self.session.protocol}//{self.session.host}/services/flags/admin.php' headers = {'Accept': 'text/json'} # must be text/json specifically params = {'identifier': self.identifier} r = self.session.get(url, headers=headers, params=params) return r # TODO: `list` parameter name shadows the Python builtin def remove_from_simplelist(self, parent, list) -> Response: """Remove item from a simplelist. :returns: :class:`requests.Response` """ patch = { 'op': 'delete', 'parent': parent, 'list': list, } data = { '-patch': json.dumps(patch), '-target': 'simplelists', } r = self.session.post(self.urls.metadata, data=data) # type: ignore return r def upload_file(self, body, # noqa: PLR0915; TODO: Refactor this method to reduce complexity key: str | None = None, metadata: Mapping | None = None, file_metadata: Mapping | None = None, headers: dict | None = None, access_key: str | None = None, secret_key: str | None = None, queue_derive: bool = False, verbose: bool = False, verify: bool = False, checksum: bool = False, delete: bool = False, retries: int | None = None, retries_sleep: int | None = None, debug: bool = False, validate_identifier: bool = False, request_kwargs: MutableMapping | None = None, set_scanner: bool = True) -> Request | Response: """Upload a single file to an item. The item will be created if it does not exist. :type body: Filepath or file-like object. :param body: File or data to be uploaded. :param key: Remote filename. :param metadata: Metadata used to create a new item. :param file_metadata: File-level metadata to add to the files.xml entry for the file being uploaded. :param headers: Add additional IA-S3 headers to request. :param queue_derive: Set to False to prevent an item from being derived after upload. :param verify: Verify local MD5 checksum matches the MD5 checksum of the file received by IAS3. :param checksum: Skip based on checksum. :param delete: Delete local file after the upload has been successfully verified. :param retries: Number of times to retry the given request if S3 returns a 503 SlowDown error. :param retries_sleep: Amount of time to sleep between ``retries``. :param verbose: Print progress to stdout. :param debug: Set to True to print headers to stdout, and exit without sending the upload request. :param validate_identifier: Set to True to validate the identifier before uploading the file. Usage:: >>> import internetarchive >>> item = internetarchive.Item('identifier') >>> item.upload_file('/path/to/image.jpg', ... key='photos/image1.jpg') True """ # Set defaults. headers = headers or {} metadata = metadata or {} file_metadata = file_metadata or {} access_key = access_key or self.session.access_key secret_key = secret_key or self.session.secret_key queue_derive = bool(queue_derive) verbose = bool(verbose) verify = bool(verify) delete = bool(delete) # Set checksum after delete. checksum = delete or checksum retries = retries or 0 retries_sleep = retries_sleep or 30 debug = bool(debug) validate_identifier = bool(validate_identifier) request_kwargs = request_kwargs or {} if 'timeout' not in request_kwargs: request_kwargs['timeout'] = 120 md5_sum = None _headers = headers.copy() if not hasattr(body, 'read'): filename = body body = open(body, 'rb') else: filename = key or body.name size = get_file_size(body) # Support for uploading empty files. if size == 0: _headers['Content-Length'] = '0' if not _headers.get('x-archive-size-hint'): _headers['x-archive-size-hint'] = str(size) # Build IA-S3 URL. if validate_identifier: validate_s3_identifier(self.identifier or "") key = norm_filepath(filename).split('/')[-1] if key is None else key base_url = f'{self.session.protocol}//s3.us.archive.org/{self.identifier}' url = f'{base_url}/{quote(norm_filepath(key).lstrip("/").encode("utf-8"))}' # Skip based on checksum. if checksum: md5_sum = get_md5(body) ia_file = self.get_file(key) if (not self.tasks) and (ia_file) and (ia_file.md5 == md5_sum): log.info(f'{key} already exists: {url}') if verbose: print(f' {key} already exists, skipping.', file=sys.stderr) if delete: log.info( f'{key} successfully uploaded to ' f'https://archive.org/download/{self.identifier}/{key} ' 'and verified, deleting local copy') body.close() os.remove(filename) # Return an empty response object if checksums match. # TODO: Is there a better way to handle this? body.close() return Response() # require the Content-MD5 header when delete is True. if verify or delete: if not md5_sum: md5_sum = get_md5(body) _headers['Content-MD5'] = md5_sum def _build_request(): body.seek(0, os.SEEK_SET) if verbose: try: # hack to raise exception so we get some output for # empty files. if size == 0: raise Exception chunk_size = 1048576 expected_size = math.ceil(size / chunk_size) chunks = chunk_generator(body, chunk_size) progress_generator = tqdm(chunks, desc=f' uploading {key}', dynamic_ncols=True, total=expected_size, unit='MiB') data = None # pre_encode is needed because http doesn't know that it # needs to encode a TextIO object when it's wrapped # in the Iterator from tqdm. # So, this FileAdapter provides pre-encoded output data = IterableToFileAdapter( progress_generator, size, pre_encode=isinstance(body, io.TextIOBase) ) except Exception: print(f' uploading {key}', file=sys.stderr) data = body else: data = body _headers.update(self.session.headers) request = S3Request(method='PUT', url=url, headers=_headers, data=data, metadata=metadata, file_metadata=file_metadata, access_key=access_key, secret_key=secret_key, queue_derive=queue_derive, set_scanner=set_scanner) return request if debug: prepared_request = self.session.prepare_request(_build_request()) body.close() return prepared_request else: try: first_try = True while True: error_msg = ('s3 is overloaded, sleeping for ' f'{retries_sleep} seconds and retrying. ' f'{retries} retries left.') if retries > 0 and not first_try: try: overloaded = self.session.s3_is_overloaded( access_key=access_key) except Exception as e: error_msg = ('error checking if s3 is overloaded via ' 's3.us.archive.org?check_limit=1, ' f'exception raised: "{e}". ' f'sleeping for {retries_sleep} seconds and ' f'retrying. {retries} retries left.') overloaded = True if overloaded: sleep(retries_sleep) log.info(error_msg) if verbose: print(f' warning: {error_msg}', file=sys.stderr) retries -= 1 continue request = _build_request() prepared_request = request.prepare() # chunked transfer-encoding is NOT supported by IA-S3. # It should NEVER be set. Requests adds it in certain # scenarios (e.g. if content-length is 0). Stop it. if prepared_request.headers.get('transfer-encoding') == 'chunked': del prepared_request.headers['transfer-encoding'] response = self.session.send(prepared_request, stream=True, **request_kwargs) if (response.status_code == 503) and (retries > 0): if b'appears to be spam' in response.content: log.info('detected as spam, upload failed') break log.info(error_msg) if verbose: print(f' warning: {error_msg}', file=sys.stderr) sleep(retries_sleep) retries -= 1 first_try = False continue else: if response.status_code == 503: log.info('maximum retries exceeded, upload failed.') break response.raise_for_status() log.info(f'uploaded {key} to {url}') if delete and response.status_code == 200: log.info( f'{key} successfully uploaded to ' f'https://archive.org/download/{self.identifier}/{key} and verified, ' 'deleting local copy') body.close() os.remove(filename) response.close() return response except HTTPError as exc: try: msg = get_s3_xml_text(exc.response.content) # type: ignore except ExpatError: # probably HTTP 500 error and response is invalid XML msg = ('IA S3 returned invalid XML ' # type: ignore f'(HTTP status code {exc.response.status_code}). ' 'This is a server side error which is either temporary, ' 'or requires the intervention of IA admins.') error_msg = f' error uploading {key} to {self.identifier}, {msg}' log.error(error_msg) if verbose: print(f' error uploading {key}: {msg}', file=sys.stderr) # Raise HTTPError with error message. raise type(exc)(error_msg, response=exc.response, request=exc.request) finally: body.close() def upload(self, files, metadata: Mapping | None = None, headers: dict | None = None, access_key: str | None = None, secret_key: str | None = None, queue_derive=None, # TODO: True if None?? verbose: bool = False, verify: bool = False, checksum: bool = False, delete: bool = False, retries: int | None = None, retries_sleep: int | None = None, debug: bool = False, validate_identifier: bool = False, request_kwargs: dict | None = None, set_scanner: bool = True) -> list[Request | Response]: r"""Upload files to an item. The item will be created if it does not exist. :type files: str, file, list, tuple, dict :param files: The filepaths or file-like objects to upload. :param \*\*kwargs: Optional arguments that :func:`Item.upload_file()` takes. :returns: A list of :class:`requests.Response` objects. Usage:: >>> import internetarchive >>> item = internetarchive.Item('identifier') >>> md = {'mediatype': 'image', 'creator': 'Jake Johnson'} >>> item.upload('/path/to/image.jpg', metadata=md, queue_derive=False) [] Uploading multiple files:: >>> r = item.upload(['file1.txt', 'file2.txt']) >>> r = item.upload([fileobj, fileobj2]) >>> r = item.upload(('file1.txt', 'file2.txt')) Uploading file objects: >>> import io >>> f = io.BytesIO(b'some initial binary data: \x00\x01') >>> r = item.upload({'remote-name.txt': f}) >>> f = io.BytesIO(b'some more binary data: \x00\x01') >>> f.name = 'remote-name.txt' >>> r = item.upload(f) *Note: file objects must either have a name attribute, or be uploaded in a dict where the key is the remote-name* Setting the remote filename with a dict:: >>> r = item.upload({'remote-name.txt': '/path/to/local/file.txt'}) """ queue_derive = True if queue_derive is None else queue_derive remote_dir_name = None total_files = 0 if isinstance(files, dict): if files.get('name'): files = [files] total_files = 1 else: files = list(files.items()) if not isinstance(files, (list, tuple)): files = [files] if all(isinstance(f, dict) and f.get('name') for f in files): total_files = len(files) responses = [] file_index = 0 headers = headers or {} if (queue_derive or not headers.get('x-archive-size-hint')) and total_files == 0: total_files, total_size = recursive_file_count_and_size(files, item=self, checksum=checksum) if not headers.get('x-archive-size-hint'): headers['x-archive-size-hint'] = str(total_size) file_metadata = None for f in files: if isinstance(f, dict): if f.get('name'): file_metadata = f.copy() del file_metadata['name'] f = f['name'] if ((isinstance(f, str) and is_dir(f)) or (isinstance(f, tuple) and is_dir(f[-1]))): if isinstance(f, tuple): remote_dir_name = f[0].strip('/') f = f[-1] for filepath, key in iter_directory(f): file_index += 1 # Set derive header if queue_derive is True, # and this is the last request being made. if queue_derive is True and file_index >= total_files: _queue_derive = True else: _queue_derive = False if not f.endswith('/'): if remote_dir_name: key = f'{remote_dir_name}{f}/{key}' else: key = f'{f}/{key}' elif remote_dir_name: key = f'{remote_dir_name}/{key}' key = norm_filepath(key) resp = self.upload_file(filepath, key=key, metadata=metadata, file_metadata=file_metadata, headers=headers, access_key=access_key, secret_key=secret_key, queue_derive=_queue_derive, verbose=verbose, verify=verify, checksum=checksum, delete=delete, retries=retries, retries_sleep=retries_sleep, debug=debug, validate_identifier=validate_identifier, request_kwargs=request_kwargs, set_scanner=set_scanner) responses.append(resp) else: file_index += 1 # Set derive header if queue_derive is True, # and this is the last request being made. # if queue_derive is True and file_index >= len(files): if queue_derive is True and file_index >= total_files: _queue_derive = True else: _queue_derive = False if not isinstance(f, (list, tuple)): key, body = (None, f) else: key, body = f if key and not isinstance(key, str): key = str(key) resp = self.upload_file(body, key=key, metadata=metadata, file_metadata=file_metadata, headers=headers, access_key=access_key, secret_key=secret_key, queue_derive=_queue_derive, verbose=verbose, verify=verify, checksum=checksum, delete=delete, retries=retries, retries_sleep=retries_sleep, debug=debug, validate_identifier=validate_identifier, request_kwargs=request_kwargs, set_scanner=set_scanner) responses.append(resp) return responses class Collection(Item): """This class represents an archive.org collection.""" def __init__(self, *args, **kwargs): self.searches = {} if isinstance(args[0], Item): orig = args[0] args = (orig.session, orig.identifier, orig.item_metadata) super().__init__(*args, **kwargs) if self.metadata.get('mediatype', 'collection') != 'collection': raise ValueError('mediatype is not "collection"!') deflt_srh = f'collection:{self.identifier}' self._make_search('contents', self.metadata.get('search_collection', deflt_srh)) self._make_search('subcollections', f'{deflt_srh} AND mediatype:collection') def _do_search(self, name: str, query: str): rtn = self.searches.setdefault( name, self.session.search_items(query, fields=['identifier'])) if not hasattr(self, f'{name}_count'): setattr(self, f'{name}_count', self.searches[name].num_found) return rtn.iter_as_items() def _make_search(self, name: str, query: str): setattr(self, name, lambda: self._do_search(name, query))