witness-fracture/.venv/lib/python3.12/site-packages/internetarchive/files.py
2025-06-23 17:55:02 -05:00

499 lines
19 KiB
Python

#
# The internetarchive module is a Python/CLI interface to Archive.org.
#
# Copyright (C) 2012-2024 Internet Archive
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
internetarchive.files
~~~~~~~~~~~~~~~~~~~~~
:copyright: (C) 2012-2024 by Internet Archive.
:license: AGPL 3, see LICENSE for more details.
"""
import logging
import os
import socket
import sys
from contextlib import nullcontext, suppress
from email.utils import parsedate_to_datetime
from time import sleep
from urllib.parse import quote
from requests.exceptions import (
ConnectionError,
ConnectTimeout,
HTTPError,
ReadTimeout,
RetryError,
)
from tqdm import tqdm
from internetarchive import auth, exceptions, iarequest, utils
log = logging.getLogger(__name__)
class BaseFile:
def __init__(self, item_metadata, name, file_metadata=None):
if file_metadata is None:
file_metadata = {}
name = name.strip('/')
if not file_metadata:
for f in item_metadata.get('files', []):
if f.get('name') == name:
file_metadata = f
break
self.identifier = item_metadata.get('metadata', {}).get('identifier')
self.name = name
self.size = None
self.source = None
self.format = None
self.md5 = None
self.sha1 = None
self.mtime = None
self.crc32 = None
self.exists = bool(file_metadata)
for key in file_metadata:
setattr(self, key, file_metadata[key])
# An additional, more orderly way to access file metadata,
# which avoids filtering the attributes.
self.metadata = file_metadata
self.mtime = float(self.mtime) if self.mtime else 0
self.size = int(self.size) if self.size else 0
class File(BaseFile):
"""This class represents a file in an archive.org item. You
can use this class to access the file metadata::
>>> import internetarchive
>>> item = internetarchive.Item('stairs')
>>> file = internetarchive.File(item, 'stairs.avi')
>>> print(f.format, f.size)
('Cinepack', '3786730')
Or to download a file::
>>> file.download()
>>> file.download('fabulous_movie_of_stairs.avi')
This class also uses IA's S3-like interface to delete a file
from an item. You need to supply your IAS3 credentials in
environment variables in order to delete::
>>> file.delete(access_key='Y6oUrAcCEs4sK8ey',
... secret_key='youRSECRETKEYzZzZ')
You can retrieve S3 keys here: `https://archive.org/account/s3.php
<https://archive.org/account/s3.php>`__
"""
def __init__(self, item, name, file_metadata=None):
"""
:type item: Item
:param item: The item that the file is part of.
:type name: str
:param name: The filename of the file.
:type file_metadata: dict
:param file_metadata: (optional) a dict of metadata for the
given file.
"""
super().__init__(item.item_metadata, name, file_metadata)
self.item = item
url_parts = {
'protocol': item.session.protocol,
'id': self.identifier,
'name': quote(name.encode('utf-8')),
'host': item.session.host,
}
self.url = '{protocol}//{host}/download/{id}/{name}'.format(**url_parts)
if self.item.session.access_key and self.item.session.secret_key:
self.auth = auth.S3Auth(self.item.session.access_key,
self.item.session.secret_key)
else:
self.auth = None
def __repr__(self):
return (f'File(identifier={self.identifier!r}, '
f'filename={self.name!r}, '
f'size={self.size!r}, '
f'format={self.format!r})')
def download( # noqa: C901,PLR0911,PLR0912,PLR0915
self,
file_path=None,
verbose=None,
ignore_existing=None,
checksum=None,
checksum_archive=None,
destdir=None,
retries=None,
ignore_errors=None,
fileobj=None,
return_responses=None,
no_change_timestamp=None,
params=None,
chunk_size=None,
stdout=None,
ors=None,
timeout=None,
):
"""Download the file into the current working directory.
:type file_path: str
:param file_path: Download file to the given file_path.
:type verbose: bool
:param verbose: (optional) Turn on verbose output.
:type ignore_existing: bool
:param ignore_existing: Overwrite local files if they already
exist.
:type checksum: bool
:param checksum: (optional) Skip downloading file based on checksum.
:type checksum_archive: bool
:param checksum_archive: (optional) Skip downloading file based on checksum, and
skip checksum validation if it already succeeded
(will create and use _checksum_archive.txt).
:type destdir: str
:param destdir: (optional) The directory to download files to.
:type retries: int
:param retries: (optional) The number of times to retry on failed
requests.
:type ignore_errors: bool
:param ignore_errors: (optional) Don't fail if a single file fails to
download, continue to download other files.
:type fileobj: file-like object
:param fileobj: (optional) Write data to the given file-like object
(e.g. sys.stdout).
:type return_responses: bool
:param return_responses: (optional) Rather than downloading files to disk, return
a list of response objects.
:type no_change_timestamp: bool
:param no_change_timestamp: (optional) If True, leave the time stamp as the
current time instead of changing it to that given in
the original archive.
:type stdout: bool
:param stdout: (optional) Print contents of file to stdout instead of downloading
to file.
:type ors: bool
:param ors: (optional) Append a newline or $ORS to the end of file.
This is mainly intended to be used internally with `stdout`.
:type params: dict
:param params: (optional) URL parameters to send with
download request (e.g. `cnt=0`).
:rtype: bool
:returns: True if file was successfully downloaded.
"""
verbose = False if verbose is None else verbose
ignore_existing = False if ignore_existing is None else ignore_existing
checksum = False if checksum is None else checksum
checksum_archive = False if checksum_archive is None else checksum_archive
retries = retries or 2
ignore_errors = ignore_errors or False
return_responses = return_responses or False
no_change_timestamp = no_change_timestamp or False
params = params or None
timeout = 12 if not timeout else timeout
headers = {}
retries_sleep = 3 # TODO: exponential sleep
retrying = False # for retry loop
self.item.session.mount_http_adapter(max_retries=retries)
file_path = file_path or self.name
if destdir:
if return_responses is not True:
try:
os.mkdir(destdir)
except FileExistsError:
pass
if os.path.isfile(destdir):
raise OSError(f'{destdir} is not a directory!')
file_path = os.path.join(destdir, file_path)
parent_dir = os.path.dirname(file_path)
# Check if we should skip...
if not return_responses and os.path.exists(file_path.encode('utf-8')):
if checksum_archive:
checksum_archive_filename = '_checksum_archive.txt'
if not os.path.exists(checksum_archive_filename):
with open(checksum_archive_filename, 'w', encoding='utf-8') as f:
pass
with open(checksum_archive_filename, encoding='utf-8') as f:
checksum_archive_data = f.read().splitlines()
if file_path in checksum_archive_data:
msg = (
f'skipping {file_path}, '
f'file already exists based on checksum_archive.'
)
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
if ignore_existing:
msg = f'skipping {file_path}, file already exists.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif checksum or checksum_archive:
with open(file_path, 'rb') as fp:
md5_sum = utils.get_md5(fp)
if md5_sum == self.md5:
msg = f'skipping {file_path}, file already exists based on checksum.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
if checksum_archive:
# add file to checksum_archive to skip it next time
with open(checksum_archive_filename, 'a', encoding='utf-8') as f:
f.write(f'{file_path}\n')
return
# Retry loop
while True:
try:
if parent_dir != '' and return_responses is not True:
os.makedirs(parent_dir, exist_ok=True)
if not return_responses \
and not ignore_existing \
and self.name != f'{self.identifier}_files.xml' \
and os.path.exists(file_path.encode('utf-8')):
st = os.stat(file_path.encode('utf-8'))
if st.st_size != self.size and not (checksum or checksum_archive):
headers = {"Range": f"bytes={st.st_size}-"}
response = self.item.session.get(
self.url,
stream=True,
timeout=timeout,
auth=self.auth,
params=params,
headers=headers,
)
# Get timestamp from Last-Modified header
last_mod_header = response.headers.get('Last-Modified')
if last_mod_header:
dt = parsedate_to_datetime(last_mod_header)
last_mod_mtime = dt.timestamp()
else:
last_mod_mtime = self.mtime
response.raise_for_status()
# Check if we should skip based on last modified time...
if not fileobj and not return_responses and os.path.exists(file_path.encode('utf-8')):
st = os.stat(file_path.encode('utf-8'))
if st.st_mtime == last_mod_mtime:
if self.name == f'{self.identifier}_files.xml' or (st.st_size == self.size):
msg = (f'skipping {file_path}, file already exists based on '
'length and date.')
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif return_responses:
return response
if verbose:
total = int(response.headers.get('content-length', 0)) or None
progress_bar = tqdm(desc=f' downloading {self.name}',
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024)
else:
progress_bar = nullcontext()
if not chunk_size:
chunk_size = 1048576
if stdout:
fileobj = os.fdopen(sys.stdout.fileno(), 'wb', closefd=False)
if not fileobj or retrying:
if 'Range' in headers:
fileobj = open(file_path.encode('utf-8'), 'rb+')
else:
fileobj = open(file_path.encode('utf-8'), 'wb')
with fileobj, progress_bar as bar:
if 'Range' in headers:
fileobj.seek(st.st_size)
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
size = fileobj.write(chunk)
if bar is not None:
bar.update(size)
if ors:
fileobj.write(os.environ.get("ORS", "\n").encode("utf-8"))
if 'Range' in headers:
with open(file_path, 'rb') as fh:
local_checksum = utils.get_md5(fh)
try:
assert local_checksum == self.md5
except AssertionError:
msg = (f"\"{file_path}\" corrupt, "
"checksums do not match. "
"Remote file may have been modified, "
"retry download.")
os.remove(file_path.encode('utf-8'))
raise exceptions.InvalidChecksumError(msg)
break
except (RetryError, HTTPError, ConnectTimeout, OSError, ReadTimeout,
exceptions.InvalidChecksumError) as exc:
if retries > 0:
retrying = True
retries -= 1
msg = ('download failed, sleeping for '
f'{retries_sleep} seconds and retrying. '
f'{retries} retries left.')
log.warning(msg)
sleep(retries_sleep)
continue
msg = f'error downloading file {file_path}, exception raised: {exc}'
log.error(msg)
try:
os.remove(file_path)
except OSError:
pass
if verbose:
print(f' {msg}', file=sys.stderr)
if ignore_errors:
return False
else:
raise exc
# Set mtime with timestamp from Last-Modified header
if not no_change_timestamp:
# If we want to set the timestamp to that of the original archive...
with suppress(OSError): # Probably file-like object, e.g. sys.stdout.
os.utime(file_path.encode('utf-8'), (0,last_mod_mtime))
msg = f'downloaded {self.identifier}/{self.name} to {file_path}'
log.info(msg)
return True
def delete(self, cascade_delete=None, access_key=None, secret_key=None, verbose=None,
debug=None, retries=None, headers=None):
"""Delete a file from the Archive. Note: Some files -- such as
<itemname>_meta.xml -- cannot be deleted.
:type cascade_delete: bool
:param cascade_delete: (optional) Delete all files associated with the specified
file, including upstream derivatives and the original.
:type access_key: str
:param access_key: (optional) IA-S3 access_key to use when making the given
request.
:type secret_key: str
:param secret_key: (optional) IA-S3 secret_key to use when making the given
request.
:type verbose: bool
:param verbose: (optional) Print actions to stdout.
:type debug: bool
:param debug: (optional) Set to True to print headers to stdout and exit
without sending the delete request.
"""
cascade_delete = '0' if not cascade_delete else '1'
access_key = self.item.session.access_key if not access_key else access_key
secret_key = self.item.session.secret_key if not secret_key else secret_key
debug = debug or False
verbose = verbose or False
max_retries = retries or 2
headers = headers or {}
if 'x-archive-cascade-delete' not in headers:
headers['x-archive-cascade-delete'] = cascade_delete
url = f'{self.item.session.protocol}//s3.us.archive.org/{self.identifier}/{quote(self.name)}'
self.item.session.mount_http_adapter(max_retries=max_retries,
status_forcelist=[503],
host='s3.us.archive.org')
request = iarequest.S3Request(
method='DELETE',
url=url,
headers=headers,
access_key=access_key,
secret_key=secret_key
)
if debug:
return request
else:
if verbose:
msg = f' deleting: {self.name}'
if cascade_delete:
msg += ' and all derivative files.'
print(msg, file=sys.stderr)
prepared_request = self.item.session.prepare_request(request)
try:
resp = self.item.session.send(prepared_request)
resp.raise_for_status()
except (RetryError, HTTPError, ConnectTimeout,
OSError, ReadTimeout) as exc:
error_msg = f'Error deleting {url}, {exc}'
log.error(error_msg)
raise
else:
return resp
finally:
# The retry adapter is mounted to the session object.
# Make sure to remove it after delete, so it isn't
# mounted if and when the session object is used for an
# upload. This is important because we use custom retry
# handling for IA-S3 uploads.
url_prefix = f'{self.item.session.protocol}//s3.us.archive.org'
del self.item.session.adapters[url_prefix]
class OnTheFlyFile(File):
def __init__(self, item, name):
"""
:type item: Item
:param item: The item that the file is part of.
:type name: str
:param name: The filename of the file.
"""
super().__init__(item.item_metadata, name)