Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
]


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -700,6 +709,31 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None: ...


@runtime_checkable
class SupportsSetSync(Protocol):
def set_sync(self, key: str, value: Buffer) -> None: ...


@runtime_checkable
class SupportsDeleteSync(Protocol):
def delete_sync(self, key: str) -> None: ...


@runtime_checkable
class SupportsSyncStore(SupportsGetSync, SupportsSetSync, SupportsDeleteSync, Protocol): ...


async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
"""Set or delete a value in a byte setter

Expand Down
2 changes: 0 additions & 2 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,6 @@ def _encode_sync(
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
# Since blosc only support host memory, we convert the input and output of the encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this shouldn't be deleted

# between numpy array and buffer
return chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk_bytes.as_numpy_array())
)
Expand Down
105 changes: 103 additions & 2 deletions src/zarr/core/codec_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

from dataclasses import dataclass
from dataclasses import dataclass, field
from itertools import islice, pairwise
from typing import TYPE_CHECKING, Any, TypeVar
from typing import TYPE_CHECKING, Any, TypeVar, cast
from warnings import warn

from zarr.abc.codec import (
Expand All @@ -13,6 +13,7 @@
BytesBytesCodec,
Codec,
CodecPipeline,
SupportsSyncCodec,
)
from zarr.core.common import concurrent_map
from zarr.core.config import config
Expand Down Expand Up @@ -68,6 +69,106 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any:
return fill_value


@dataclass(frozen=True, slots=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but this is straight up unreviewable. I cant tell if there's a one character typo that breaks things here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, this got included by accident -- it's from a separate PR that needs to happen after this one.

class CodecChain:
"""Codec chain with pre-resolved metadata specs.

Constructed from an iterable of codecs and a chunk ArraySpec.
Resolves each codec against the spec so that encode/decode can
run without re-resolving.
"""

codecs: tuple[Codec, ...]
chunk_spec: ArraySpec

_aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False)
_aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False)
_ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False)
_ab_spec: ArraySpec = field(init=False, repr=False, compare=False)
_bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False)
_bb_spec: ArraySpec = field(init=False, repr=False, compare=False)
_all_sync: bool = field(init=False, repr=False, compare=False)

def __post_init__(self) -> None:
aa, ab, bb = codecs_from_list(list(self.codecs))

aa_specs: list[ArraySpec] = []
spec = self.chunk_spec
for aa_codec in aa:
aa_specs.append(spec)
spec = aa_codec.resolve_metadata(spec)

object.__setattr__(self, "_aa_codecs", aa)
object.__setattr__(self, "_aa_specs", tuple(aa_specs))
object.__setattr__(self, "_ab_codec", ab)
object.__setattr__(self, "_ab_spec", spec)

spec = ab.resolve_metadata(spec)
object.__setattr__(self, "_bb_codecs", bb)
object.__setattr__(self, "_bb_spec", spec)

object.__setattr__(
self,
"_all_sync",
all(isinstance(c, SupportsSyncCodec) for c in self.codecs),
)

@property
def all_sync(self) -> bool:
return self._all_sync

def decode_chunk(
self,
chunk_bytes: Buffer,
) -> NDBuffer:
"""Decode a single chunk through the full codec chain, synchronously.

Pure compute -- no IO. Only callable when all codecs support sync.
"""
bb_out: Any = chunk_bytes
for bb_codec in reversed(self._bb_codecs):
bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec)

ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec)

for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True):
ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec)

return ab_out # type: ignore[no-any-return]

def encode_chunk(
self,
chunk_array: NDBuffer,
) -> Buffer | None:
"""Encode a single chunk through the full codec chain, synchronously.

Pure compute -- no IO. Only callable when all codecs support sync.
"""
aa_out: Any = chunk_array

for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True):
if aa_out is None:
return None
aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec)

if aa_out is None:
return None
bb_out: Any = cast("SupportsSyncCodec", self._ab_codec)._encode_sync(aa_out, self._ab_spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😵‍💫

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is ugly but to set expectations, I don't think we are going to win a beauty contests with any of the stuff we have to bolt on to our codec, store, and chunk encoding routines. We are starting from a performance-unfriendly design, and trying to fix that design from the outside. In any case, this class will appear in a later PR -- I'm removing it from this one.


for bb_codec in self._bb_codecs:
if bb_out is None:
return None
bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec)

return bb_out # type: ignore[no-any-return]

def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int:
for codec in self.codecs:
byte_length = codec.compute_encoded_size(byte_length, array_spec)
array_spec = codec.resolve_metadata(array_spec)
return byte_length


@dataclass(frozen=True)
class BatchedCodecPipeline(CodecPipeline):
"""Default codec pipeline.
Expand Down
39 changes: 38 additions & 1 deletion src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias

from zarr.abc.store import ByteRequest, Store
from zarr.abc.store import (
ByteRequest,
Store,
SupportsDeleteSync,
SupportsGetSync,
SupportsSetSync,
)
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.common import (
ANY_ACCESS_MODE,
Expand Down Expand Up @@ -228,6 +234,37 @@ async def is_empty(self) -> bool:
"""
return await self.store.is_empty(self.path)

# -------------------------------------------------------------------
# Synchronous IO delegation
# -------------------------------------------------------------------

def get_sync(
self,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``."""
if not isinstance(self.store, SupportsGetSync):
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous get.")
if prototype is None:
prototype = default_buffer_prototype()
return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range)

def set_sync(self, value: Buffer) -> None:
"""Synchronous write — delegates to ``self.store.set_sync(self.path, value)``."""
if not isinstance(self.store, SupportsSetSync):
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous set.")
self.store.set_sync(self.path, value)

def delete_sync(self) -> None:
"""Synchronous delete — delegates to ``self.store.delete_sync(self.path)``."""
if not isinstance(self.store, SupportsDeleteSync):
raise TypeError(
f"Store {type(self.store).__name__} does not support synchronous delete."
)
self.store.delete_sync(self.path)

def __truediv__(self, other: str) -> StorePath:
"""Combine this store path with another path"""
return self.__class__(self.store, _dereference_path(self.path, other))
Expand Down
50 changes: 50 additions & 0 deletions src/zarr/storage/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,56 @@ def __repr__(self) -> str:
def __eq__(self, other: object) -> bool:
return isinstance(other, type(self)) and self.root == other.root

# -------------------------------------------------------------------
# Synchronous store methods
# -------------------------------------------------------------------

def _ensure_open_sync(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. in our current Store design, creating an instance of a store doesn't necessarily "open" it, so we have an async open method that actually opens the store. Our async get/set methods guard against the store being un-open:

if not self._is_open:
await self._open()
, and our sync methods have to do the same thing here.

if not self._is_open:
if not self.read_only:
self.root.mkdir(parents=True, exist_ok=True)
if not self.root.exists():
raise FileNotFoundError(f"{self.root} does not exist")
self._is_open = True

def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
if prototype is None:
prototype = default_buffer_prototype()
self._ensure_open_sync()
assert isinstance(key, str)
path = self.root / key
try:
return _get(path, prototype, byte_range)
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
return None

def set_sync(self, key: str, value: Buffer) -> None:
self._ensure_open_sync()
self._check_writable()
assert isinstance(key, str)
if not isinstance(value, Buffer):
raise TypeError(
f"LocalStore.set(): `value` must be a Buffer instance. "
f"Got an instance of {type(value)} instead."
)
path = self.root / key
_put(path, value)

def delete_sync(self, key: str) -> None:
self._ensure_open_sync()
self._check_writable()
path = self.root / key
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink(missing_ok=True)

async def get(
self,
key: str,
Expand Down
44 changes: 43 additions & 1 deletion src/zarr/storage/_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool:
and self.read_only == other.read_only
)

# -------------------------------------------------------------------
# Synchronous store methods
# -------------------------------------------------------------------

def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
self._is_open = True
assert isinstance(key, str)
try:
value = self._store_dict[key]
start, stop = _normalize_byte_range_index(value, byte_range)
return prototype.buffer.from_buffer(value[start:stop])
except KeyError:
return None

def set_sync(self, key: str, value: Buffer) -> None:
self._check_writable()
if not self._is_open:
self._is_open = True
assert isinstance(key, str)
if not isinstance(value, Buffer):
raise TypeError(
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
)
self._store_dict[key] = value

def delete_sync(self, key: str) -> None:
self._check_writable()
if not self._is_open:
self._is_open = True
try:
del self._store_dict[key]
except KeyError:
logger.debug("Key %s does not exist.", key)

async def get(
self,
key: str,
Expand Down Expand Up @@ -122,7 +165,6 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None
raise TypeError(
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
)

if byte_range is not None:
buf = self._store_dict[key]
buf[byte_range[0] : byte_range[1]] = value
Expand Down
Loading