Source code for uv.fs.util

#!/usr/bin/python
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities shared by functions that interact with pyfilesystem2."""

import io
from typing import Any, Union

import fs as pyfs
import uv.types as t
import uv.util as u
from casfs import CASFS
from fs.base import FS
from fs.copy import copy_fs
from fs.zipfs import ZipFS


[docs]class HandleCache(): """Class that opens file-handles and maintains an internal cache of those handles. On close, closes the supplied filesystem, closes all handles and clears the cache. """ def __init__(self, fs: FS): self._m = {} self._fs = fs
[docs] def open(self, path: str, mode: str): abs_path = pyfs.path.abspath(path) handle = self._m.get(abs_path) if handle is None: handle = self._fs.open(abs_path, mode=mode) self._m[abs_path] = handle return handle
[docs] def clear(self) -> None: for _, handle in self._m.items(): handle.close() self._m.clear()
[docs] def close(self) -> None: self.clear() self._fs.close()
[docs]def load_fs(root: Union[FS, str]) -> FS: """If str is supplied, returns an instance of OSFS, backed by the local filesystem; else, returns the filesystem argument directly. Errors if supplied with an invalid argument. """ if isinstance(root, str): return pyfs.open_fs(root, create=True) if isinstance(root, FS): return root raise ValueError("Not a filesystem or path!")
[docs]def jsonl_path(k: t.MetricKey) -> str: """Returns an absolute path with an appropriate suffix for a jsonl file.""" return pyfs.path.abspath("{}.jsonl".format(k))
[docs]def to_bytes(item: Union[str, bytes]) -> bytes: """Accepts either a bytes instance or a string; if str, returns a bytes instance, else acts as identity. """ ret = item if not isinstance(item, bytes): ret = bytes(item, "utf8") return ret
[docs]def jsonl_bytes(v: Any) -> bytes: """Returns a bytes instance containing a single line of json with a newline appended. """ return to_bytes(u.json_str(v) + "\n")
[docs]def get_cas(cas_input): """Version of the CASFS constructor that creates directories that don't exist. TODO delete this once we get that idea merged back into CASFS. """ cas_fs = pyfs.open_fs(cas_input, create=True) return CASFS(cas_fs)
[docs]def persist_to_cas_via_memory(cfs, source_fs): """Example call: persist_to_cas_via_memory( CASFS("/Users/samritchie/casfs"), "/Users/samritchie/tester" # metrics directory ) """ with io.BytesIO() as f: # make sure the ZipFS closes before we attempt to transfer its contents # over to the content addressable store. with ZipFS(f, write=True) as zfs: copy_fs(source_fs, zfs) return cfs.put(f)