Source code for py2store.examples.dropbox_w_urllib

"""Dropbox read access with mapping interface -- using only builtins"""
import os
import zipfile
import tempfile
import urllib.request
from py2store.base import KvReader


[docs]class DropboxFolderCopyReader(KvReader): """Makes a full local copy of the folder (by default, to a local temp folder) and gives access to it. """ def __init__(self, url, path=tempfile.gettempdir()): self.url = url self.path = path os.makedirs(self.path, exist_ok=True) self._zip_filepath = os.path.join(self.path, 'shared_folder.zip') self._files = [] self._get_folder() def __getitem__(self, rel_path): real_path = os.path.join(self.path, rel_path) try: with open(real_path, 'r') as f: return f.read() except FileNotFoundError: raise KeyError(f"Key doesn't exist: {rel_path}") def __iter__(self): yield from sorted(path for path in self._files if not path == '/') def __contains__(self, rel_path): return rel_path in self._files def _get_folder(self): download_from_dropbox(self.url, self._zip_filepath) self._unzip() os.remove(self._zip_filepath) def _unzip(self): with zipfile.ZipFile(self._zip_filepath, 'r') as zip_ref: zip_ref.extractall(self.path) self._files = zip_ref.namelist()
[docs]class DropboxFileCopyReader(KvReader): def __init__(self, url, path=None): self.url = url self.path = path or self._get_filename_from_url() download_from_dropbox(self.url, self.path) self.file = open(self.path, 'r') def __getitem__(self, index): self.file.seek(0) return self.readlines()[index] def __iter__(self): self.file.seek(0) yield from self.file def __len__(self): return len(self.readlines()) def __contains__(self, k): return k in self.read() def __del__(self): if self.file: self.file.close() self.file = None def read(self): self.file.seek(0) return self.file.read() def readlines(self): self.file.seek(0) return self.file.readlines() def _get_filename_from_url(self): # 'https:...txt?dl=0&smth=else' -> 'https:...txt' url_w_no_params = self.url.split('?', 1)[0] # 'https://www.dropbox.com/.../my_file.txt' -> 'my_file.txt' last_part_of_urls_path = url_w_no_params.rsplit('/', 1)[-1] return last_part_of_urls_path
DFLT_USER_AGENT = 'Wget/1.16 (linux-gnu)' def download_from_dropbox( url, file, chk_size=1024, user_agent=DFLT_USER_AGENT ): def iter_content_and_copy_to(file): req = urllib.request.Request(url) req.add_header('user-agent', user_agent) with urllib.request.urlopen(req) as response: while True: chk = response.read(chk_size) if len(chk) > 0: file.write(chk) else: break if not isinstance(file, str): iter_content_and_copy_to(file) else: with open(file, 'wb') as _target_file: iter_content_and_copy_to(_target_file) def bytes_from_dropbox(url, chk_size=1024, user_agent=DFLT_USER_AGENT): from io import BytesIO with BytesIO() as file: download_from_dropbox( url, file, chk_size=chk_size, user_agent=user_agent ) file.seek(0) return file.read() # DFLT_USER_AGENT = 'Wget/1.16 (linux-gnu)' # def download_from_dropbox(url, file, as_zip=False, chunk_size=1024, user_agent=DFLT_USER_AGENT): # # response = requests.get( # url, # params={'dl': int(as_zip)}, # headers={'user-agent': user_agent}, # stream=True, # ) # # def iter_content_and_copy_to(file): # for chunk in response.iter_content(chunk_size=chunk_size): # if chunk: # file.write(chunk) # # if not isinstance(file, str): # iter_content_and_copy_to(file) # else: # with open(file, 'wb') as _target_file: # iter_content_and_copy_to(_target_file) # # # def bytes_from_dropbox(url, chunk_size=1024, user_agent=DFLT_USER_AGENT): # from io import BytesIO # with BytesIO() as file: # download_from_dropbox(url, file, chunk_size=chunk_size, user_agent=user_agent) # file.seek(0) # return file.read()