Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,12 @@ Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t
}
return Status::OK();
};
auto get_delete_bitmap_from_file = [&](const std::string& rowset_id) {
auto get_delete_bitmap_from_file = [&](const std::string& rowset_id,
const DeleteBitmapStoragePB& storage) {
if (config::enable_mow_verbose_log) {
LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rowset_id << " from file";
<< ", rowset_id=" << rowset_id << " from file"
<< ", is_packed=" << storage.has_packed_slice_location();
}
if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) {
return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}",
Expand All @@ -1212,11 +1214,23 @@ Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
resource_id);
}
DeleteBitmapFileReader reader(tablet->tablet_id(), rowset_id, storage_resource);
RETURN_IF_ERROR(reader.init());

// Use packed file reader if packed_slice_location is present
std::unique_ptr<DeleteBitmapFileReader> reader;
if (storage.has_packed_slice_location() &&
!storage.packed_slice_location().packed_file_path().empty()) {
reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
storage_resource,
storage.packed_slice_location());
} else {
reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
storage_resource);
}

RETURN_IF_ERROR(reader->init());
DeleteBitmapPB dbm;
RETURN_IF_ERROR(reader.read(dbm));
RETURN_IF_ERROR(reader.close());
RETURN_IF_ERROR(reader->read(dbm));
RETURN_IF_ERROR(reader->close());
return merge_delete_bitmap(rowset_id, dbm);
};
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Expand All @@ -1230,8 +1244,9 @@ Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t
DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap();
RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm));
} else {
auto submit_st = token->submit_func([&]() {
auto status = get_delete_bitmap_from_file(rowset_id);
const auto& storage = delete_bitmap_storages[i];
auto submit_st = token->submit_func([&, rowset_id, storage]() {
auto status = get_delete_bitmap_from_file(rowset_id, storage);
if (!status.ok()) {
LOG(WARNING) << "failed to get delete bitmap for tablet_id="
<< tablet->tablet_id() << ", rowset_id=" << rowset_id
Expand Down Expand Up @@ -1691,12 +1706,26 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
DeleteBitmapStoragePB delete_bitmap_storage;
if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 &&
delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) {
DeleteBitmapFileWriter file_writer(tablet.tablet_id(), rowset_id, storage_resource);
// Enable packed file only for load (txn_id > 0)
bool enable_packed = config::enable_packed_file && txn_id > 0;
DeleteBitmapFileWriter file_writer(tablet.tablet_id(), rowset_id, storage_resource,
enable_packed, txn_id);
RETURN_IF_ERROR(file_writer.init());
RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb));
RETURN_IF_ERROR(file_writer.close());
delete_bitmap_pb.Clear();
delete_bitmap_storage.set_store_in_fdb(false);

// Store packed slice location if file was written to packed file
if (file_writer.is_packed()) {
io::PackedSliceLocation loc;
RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc));
auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location();
packed_loc->set_packed_file_path(loc.packed_file_path);
packed_loc->set_offset(loc.offset);
packed_loc->set_size(loc.size);
packed_loc->set_packed_file_size(loc.packed_file_size);
}
} else {
delete_bitmap_storage.set_store_in_fdb(true);
*(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb);
Expand Down
40 changes: 37 additions & 3 deletions be/src/cloud/delete_bitmap_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cloud/delete_bitmap_file_writer.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/packed_file_reader.h"
#include "util/coding.h"

namespace doris {
Expand All @@ -29,6 +30,20 @@ DeleteBitmapFileReader::DeleteBitmapFileReader(int64_t tablet_id, const std::str
std::optional<StorageResource>& storage_resource)
: _tablet_id(tablet_id), _rowset_id(rowset_id), _storage_resource(storage_resource) {}

DeleteBitmapFileReader::DeleteBitmapFileReader(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource,
const PackedSliceLocationPB& packed_location)
: _tablet_id(tablet_id),
_rowset_id(rowset_id),
_storage_resource(storage_resource),
_is_packed(true),
_packed_offset(packed_location.offset()),
_packed_size(packed_location.size()),
_packed_file_path(packed_location.packed_file_path()),
_packed_file_size(packed_location.has_packed_file_size()
? packed_location.packed_file_size()
: -1) {}

DeleteBitmapFileReader::~DeleteBitmapFileReader() = default;

Status DeleteBitmapFileReader::init() {
Expand All @@ -45,9 +60,28 @@ Status DeleteBitmapFileReader::init() {
if (!_storage_resource) {
return Status::InternalError("invalid storage resource for tablet_id={}", _tablet_id);
}
_path = _storage_resource->remote_delete_bitmap_path(_tablet_id, _rowset_id);
io::FileReaderOptions opts;
return _storage_resource->fs->open_file(_path, &_file_reader, &opts);

if (_is_packed) {
// Read from packed file
io::FileReaderSPtr inner_reader;
io::FileReaderOptions opts;
if (_packed_file_size > 0) {
opts.file_size = _packed_file_size;
}
opts.cache_type = io::FileCachePolicy::NO_CACHE;
RETURN_IF_ERROR(_storage_resource->fs->open_file(io::Path(_packed_file_path), &inner_reader,
&opts));

_path = _storage_resource->remote_delete_bitmap_path(_tablet_id, _rowset_id);
_file_reader = std::make_shared<io::PackedFileReader>(
std::move(inner_reader), io::Path(_path), _packed_offset, _packed_size);
} else {
// Read from standalone file
_path = _storage_resource->remote_delete_bitmap_path(_tablet_id, _rowset_id);
io::FileReaderOptions opts;
RETURN_IF_ERROR(_storage_resource->fs->open_file(_path, &_file_reader, &opts));
}
return Status::OK();
}

Status DeleteBitmapFileReader::close() {
Expand Down
13 changes: 13 additions & 0 deletions be/src/cloud/delete_bitmap_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/file_reader_writer_fwd.h"

namespace doris {
Expand All @@ -27,8 +28,13 @@ class DeleteBitmapPB;

class DeleteBitmapFileReader {
public:
// Constructor for standalone files
explicit DeleteBitmapFileReader(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource);
// Constructor for packed file reading
explicit DeleteBitmapFileReader(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource,
const PackedSliceLocationPB& packed_location);
~DeleteBitmapFileReader();

Status init();
Expand All @@ -41,6 +47,13 @@ class DeleteBitmapFileReader {
std::optional<StorageResource> _storage_resource;
std::string _path;
io::FileReaderSPtr _file_reader;

// Packed file support
bool _is_packed = false;
int64_t _packed_offset = 0;
int64_t _packed_size = 0;
std::string _packed_file_path;
int64_t _packed_file_size = -1;
};

} // namespace doris
64 changes: 61 additions & 3 deletions be/src/cloud/delete_bitmap_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#include <crc32c/crc32c.h>

#include "cloud/config.h"
#include "io/fs/file_writer.h"
#include "io/fs/packed_file_writer.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand All @@ -28,6 +30,15 @@ DeleteBitmapFileWriter::DeleteBitmapFileWriter(int64_t tablet_id, const std::str
std::optional<StorageResource>& storage_resource)
: _tablet_id(tablet_id), _rowset_id(rowset_id), _storage_resource(storage_resource) {}

DeleteBitmapFileWriter::DeleteBitmapFileWriter(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource,
bool enable_packed_file, int64_t txn_id)
: _tablet_id(tablet_id),
_rowset_id(rowset_id),
_storage_resource(storage_resource),
_enable_packed_file(enable_packed_file),
_txn_id(txn_id) {}

DeleteBitmapFileWriter::~DeleteBitmapFileWriter() {}

Status DeleteBitmapFileWriter::init() {
Expand All @@ -48,8 +59,30 @@ Status DeleteBitmapFileWriter::init() {
}
_path = _storage_resource->remote_delete_bitmap_path(_tablet_id, _rowset_id);
io::FileWriterOptions opts;
// opts.write_file_cache = true;
return _storage_resource->fs->create_file(_path, &_file_writer, &opts);

if (_enable_packed_file) {
// Create underlying file writer
io::FileWriterPtr inner_writer;
// Disable write_file_cache for inner writer when using PackedFileWriter.
// Small files will be cached separately by PackedFileManager using the
// small file path as cache key.
opts.write_file_cache = false;
RETURN_IF_ERROR(_storage_resource->fs->create_file(_path, &inner_writer, &opts));

// Wrap with PackedFileWriter
io::PackedAppendContext append_info;
append_info.resource_id = _storage_resource->fs->id();
append_info.tablet_id = _tablet_id;
append_info.rowset_id = _rowset_id;
append_info.txn_id = _txn_id;
append_info.write_file_cache = false;

_file_writer = std::make_unique<io::PackedFileWriter>(std::move(inner_writer),
io::Path(_path), append_info);
} else {
RETURN_IF_ERROR(_storage_resource->fs->create_file(_path, &_file_writer, &opts));
}
return Status::OK();
}

Status DeleteBitmapFileWriter::close() {
Expand All @@ -60,8 +93,33 @@ Status DeleteBitmapFileWriter::close() {
auto st = _file_writer->close();
if (!st.ok()) {
LOG(WARNING) << "failed to close delete bitmap file=" << _path << ", st=" << st.to_string();
return st;
}
return st;

// Check if file was written to packed file
if (_enable_packed_file) {
auto* packed_writer = static_cast<io::PackedFileWriter*>(_file_writer.get());
io::PackedSliceLocation loc;
st = packed_writer->get_packed_slice_location(&loc);
if (!st.ok()) {
LOG(WARNING) << "failed to get packed slice location for delete bitmap file=" << _path
<< ", st=" << st.to_string();
return st;
}
if (!loc.packed_file_path.empty()) {
_is_packed = true;
_packed_location = loc;
}
}
return Status::OK();
}

Status DeleteBitmapFileWriter::get_packed_slice_location(io::PackedSliceLocation* location) const {
if (!_is_packed) {
return Status::InternalError("delete bitmap file is not packed");
}
*location = _packed_location;
return Status::OK();
}

Status DeleteBitmapFileWriter::write(const DeleteBitmapPB& delete_bitmap) {
Expand Down
17 changes: 16 additions & 1 deletion be/src/cloud/delete_bitmap_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/packed_file_manager.h"

namespace doris {

Expand All @@ -29,12 +30,20 @@ class DeleteBitmapFileWriter {
public:
explicit DeleteBitmapFileWriter(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource);
// Constructor with packed file support
explicit DeleteBitmapFileWriter(int64_t tablet_id, const std::string& rowset_id,
std::optional<StorageResource>& storage_resource,
bool enable_packed_file, int64_t txn_id);
~DeleteBitmapFileWriter();

Status init();
Status write(const DeleteBitmapPB& delete_bitmap);
Status close();

// Get packed slice location after close
Status get_packed_slice_location(io::PackedSliceLocation* location) const;
bool is_packed() const { return _is_packed; }

public:
static constexpr const char* DELETE_BITMAP_MAGIC = "DBM1";
static const uint32_t MAGIC_SIZE = 4;
Expand All @@ -47,6 +56,12 @@ class DeleteBitmapFileWriter {
std::optional<StorageResource> _storage_resource;
std::string _path;
io::FileWriterPtr _file_writer;

// Packed file support
bool _enable_packed_file = false;
int64_t _txn_id = 0;
bool _is_packed = false;
io::PackedSliceLocation _packed_location;
};

} // namespace doris
} // namespace doris
4 changes: 3 additions & 1 deletion be/src/io/fs/packed_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice
// Async write data to file cache using small file path as cache key.
// This ensures cache key matches the cleanup key in Rowset::clear_cache(),
// allowing proper cache cleanup when stale rowsets are removed.
write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
if (info.write_file_cache) {
write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
}

// Update index
PackedSliceLocation location;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/packed_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct PackedAppendContext {
std::string rowset_id;
int64_t txn_id = 0;
uint64_t expiration_time = 0; // TTL expiration time in seconds since epoch, 0 means no TTL
bool write_file_cache = true; // Whether to write data to file cache
};

// Global object that manages packing small files into larger files for S3 optimization
Expand Down
Loading
Loading