-
-
Notifications
You must be signed in to change notification settings - Fork 387
perf/store sync #3725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
perf/store sync #3725
Changes from all commits
2b64daa
cd4efb0
41b7a6a
5a2a884
4e262b1
34b30a1
8cb35d9
9710a74
c0c1b24
b36a2d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,8 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from dataclasses import dataclass, field | ||
| from itertools import islice, pairwise | ||
| from typing import TYPE_CHECKING, Any, TypeVar | ||
| from typing import TYPE_CHECKING, Any, TypeVar, cast | ||
| from warnings import warn | ||
|
|
||
| from zarr.abc.codec import ( | ||
|
|
@@ -13,6 +13,7 @@ | |
| BytesBytesCodec, | ||
| Codec, | ||
| CodecPipeline, | ||
| SupportsSyncCodec, | ||
| ) | ||
| from zarr.core.common import concurrent_map | ||
| from zarr.core.config import config | ||
|
|
@@ -68,6 +69,106 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: | |
| return fill_value | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry but this is straight up unreviewable. I cant tell if there's a one character typo that breaks things here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, this got included by accident -- it's from a separate PR that needs to happen after this one. |
||
| class CodecChain: | ||
| """Codec chain with pre-resolved metadata specs. | ||
|
|
||
| Constructed from an iterable of codecs and a chunk ArraySpec. | ||
| Resolves each codec against the spec so that encode/decode can | ||
| run without re-resolving. | ||
| """ | ||
|
|
||
| codecs: tuple[Codec, ...] | ||
| chunk_spec: ArraySpec | ||
|
|
||
| _aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False) | ||
| _aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False) | ||
| _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) | ||
| _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) | ||
| _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) | ||
| _bb_spec: ArraySpec = field(init=False, repr=False, compare=False) | ||
| _all_sync: bool = field(init=False, repr=False, compare=False) | ||
|
|
||
| def __post_init__(self) -> None: | ||
| aa, ab, bb = codecs_from_list(list(self.codecs)) | ||
|
|
||
| aa_specs: list[ArraySpec] = [] | ||
| spec = self.chunk_spec | ||
| for aa_codec in aa: | ||
| aa_specs.append(spec) | ||
| spec = aa_codec.resolve_metadata(spec) | ||
|
|
||
| object.__setattr__(self, "_aa_codecs", aa) | ||
| object.__setattr__(self, "_aa_specs", tuple(aa_specs)) | ||
| object.__setattr__(self, "_ab_codec", ab) | ||
| object.__setattr__(self, "_ab_spec", spec) | ||
|
|
||
| spec = ab.resolve_metadata(spec) | ||
| object.__setattr__(self, "_bb_codecs", bb) | ||
| object.__setattr__(self, "_bb_spec", spec) | ||
|
|
||
| object.__setattr__( | ||
| self, | ||
| "_all_sync", | ||
| all(isinstance(c, SupportsSyncCodec) for c in self.codecs), | ||
| ) | ||
|
|
||
| @property | ||
| def all_sync(self) -> bool: | ||
| return self._all_sync | ||
|
|
||
| def decode_chunk( | ||
| self, | ||
| chunk_bytes: Buffer, | ||
| ) -> NDBuffer: | ||
| """Decode a single chunk through the full codec chain, synchronously. | ||
|
|
||
| Pure compute -- no IO. Only callable when all codecs support sync. | ||
| """ | ||
| bb_out: Any = chunk_bytes | ||
| for bb_codec in reversed(self._bb_codecs): | ||
| bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec) | ||
|
|
||
| ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) | ||
|
|
||
| for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True): | ||
| ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) | ||
|
|
||
| return ab_out # type: ignore[no-any-return] | ||
|
|
||
| def encode_chunk( | ||
| self, | ||
| chunk_array: NDBuffer, | ||
| ) -> Buffer | None: | ||
| """Encode a single chunk through the full codec chain, synchronously. | ||
|
|
||
| Pure compute -- no IO. Only callable when all codecs support sync. | ||
| """ | ||
| aa_out: Any = chunk_array | ||
|
|
||
| for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True): | ||
| if aa_out is None: | ||
| return None | ||
| aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) | ||
|
|
||
| if aa_out is None: | ||
| return None | ||
| bb_out: Any = cast("SupportsSyncCodec", self._ab_codec)._encode_sync(aa_out, self._ab_spec) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😵💫
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is ugly but to set expectations, I don't think we are going to win a beauty contests with any of the stuff we have to bolt on to our codec, store, and chunk encoding routines. We are starting from a performance-unfriendly design, and trying to fix that design from the outside. In any case, this class will appear in a later PR -- I'm removing it from this one. |
||
|
|
||
| for bb_codec in self._bb_codecs: | ||
| if bb_out is None: | ||
| return None | ||
| bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec) | ||
|
|
||
| return bb_out # type: ignore[no-any-return] | ||
|
|
||
| def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: | ||
| for codec in self.codecs: | ||
| byte_length = codec.compute_encoded_size(byte_length, array_spec) | ||
| array_spec = codec.resolve_metadata(array_spec) | ||
| return byte_length | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class BatchedCodecPipeline(CodecPipeline): | ||
| """Default codec pipeline. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -187,6 +187,56 @@ def __repr__(self) -> str: | |||||
| def __eq__(self, other: object) -> bool: | ||||||
| return isinstance(other, type(self)) and self.root == other.root | ||||||
|
|
||||||
| # ------------------------------------------------------------------- | ||||||
| # Synchronous store methods | ||||||
| # ------------------------------------------------------------------- | ||||||
|
|
||||||
| def _ensure_open_sync(self) -> None: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this necessary?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. in our current zarr-python/src/zarr/storage/_local.py Lines 199 to 200 in b6d3ae2
|
||||||
| if not self._is_open: | ||||||
| if not self.read_only: | ||||||
| self.root.mkdir(parents=True, exist_ok=True) | ||||||
| if not self.root.exists(): | ||||||
| raise FileNotFoundError(f"{self.root} does not exist") | ||||||
| self._is_open = True | ||||||
|
|
||||||
| def get_sync( | ||||||
| self, | ||||||
| key: str, | ||||||
| *, | ||||||
| prototype: BufferPrototype | None = None, | ||||||
| byte_range: ByteRequest | None = None, | ||||||
| ) -> Buffer | None: | ||||||
| if prototype is None: | ||||||
| prototype = default_buffer_prototype() | ||||||
| self._ensure_open_sync() | ||||||
| assert isinstance(key, str) | ||||||
| path = self.root / key | ||||||
| try: | ||||||
| return _get(path, prototype, byte_range) | ||||||
| except (FileNotFoundError, IsADirectoryError, NotADirectoryError): | ||||||
| return None | ||||||
|
|
||||||
| def set_sync(self, key: str, value: Buffer) -> None: | ||||||
| self._ensure_open_sync() | ||||||
| self._check_writable() | ||||||
| assert isinstance(key, str) | ||||||
| if not isinstance(value, Buffer): | ||||||
| raise TypeError( | ||||||
| f"LocalStore.set(): `value` must be a Buffer instance. " | ||||||
| f"Got an instance of {type(value)} instead." | ||||||
| ) | ||||||
| path = self.root / key | ||||||
| _put(path, value) | ||||||
|
|
||||||
| def delete_sync(self, key: str) -> None: | ||||||
| self._ensure_open_sync() | ||||||
| self._check_writable() | ||||||
| path = self.root / key | ||||||
| if path.is_dir(): | ||||||
| shutil.rmtree(path) | ||||||
| else: | ||||||
| path.unlink(missing_ok=True) | ||||||
|
|
||||||
| async def get( | ||||||
| self, | ||||||
| key: str, | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this shouldn't be deleted