Source code for uv.fs.reader

#!/usr/bin/python
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reader implementation that relies on the PyFilesystem2 abstraction for
filesystems. See https://github.com/PyFilesystem/pyfilesystem2 for more
information on the library.

"""

import json
from typing import Iterable, List, Union

import fs as pyfs
import uv.fs.util as u
import uv.types as t
from casfs.base import CASFS, Key
from fs.base import FS
from fs.zipfs import ZipFS
from uv.reader.base import AbstractReader, IterableReader


[docs]class FSReader(AbstractReader, IterableReader): """AbstractReader implementation backed by an instance of pyfilesystem2's FS abstraction. Args: fs: Either an fs URI string, or an actual fs.base.FS object. """ def __init__(self, fs: Union[FS, str]): self._fs = u.load_fs(fs)
[docs] def keys(self) -> Iterable[t.Metric]: """Returns all files in the filesystem that plausibly contains metrics in jsonl format. """ for p in self._fs.walk.files(filter=['*.jsonl']): base = pyfs.path.basename(p) k, _ = pyfs.path.splitext(base) yield k
[docs] def read(self, k: t.MetricKey) -> List[t.Metric]: try: abs_path = u.jsonl_path(k) with self._fs.open(abs_path, mode='rb') as handle: lines = handle.read().splitlines() return [json.loads(s.decode("utf-8")) for s in lines] except pyfs.errors.ResourceNotFound: return []
[docs] def close(self) -> None: self._fs.close()
[docs]class CASReader(FSReader): """Override that closes the file too.""" def __init__(self, cas: CASFS, k: Key): self._handle = cas.open(k) zfs = ZipFS(self._handle) super(CASReader, self).__init__(zfs)
[docs] def close(self) -> None: super(CASReader, self).close() self._handle.close()