from _io import BufferedWriter from argparse import ArgumentParser from datetime import datetime, timedelta from pathlib import Path from shutil import copyfileobj from time import sleep from zipfile import ZipFile import logging import requests def file_download(*, url: str, file_open: BufferedWriter) -> None: response = requests.head(url) content_length = response.headers["content-length"] file_end = file_open.tell() headers = {"range": f"bytes={file_end}-{content_length}"} logging.info(headers) try: with requests.get(url, headers=headers, stream=True) as response: copyfileobj(response.raw, file_open) except Exception as error: logging.error(error) sleep(1) file_download(url=url, file_open=file_open) def zipfile_download(): description = """ Download large zipfile from URL, retry at offset if interrupted, check zip. """ parser = ArgumentParser(description=description) parser.add_argument("-v", "--verbose", action="count") parser.add_argument("-url", "--url", required=True, help="URL to zipfile") parser.add_argument("-o", "--path", required=True, help="path to write zipfile") args = parser.parse_args() level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(level=level) logging.info(args) today = datetime.utcnow().strftime("%Y%m%d") logging.info(today) path = Path(args.path) if path.exists(): logging.info(f'output path exists, resuming: "{path}"') with path.open("wb") as file_open: file_download(url=args.url, file_open=file_open) with ZipFile(path) as zip_file: file_list = zip_file.namelist() if not file_list: raise Exception(f'zipfile_empty "{args.url}"') if __name__ == "__main__": zipfile_download()