Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/rmqperftest/rmqperftest.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ int main(int argc, char* argv[])
balcl::TypeInfo(&args.producer.messageFlag),
balcl::OccurrenceInfo(args.producer.messageFlag),
},
{
"use-compression",
"use-compression",
"Enable compression for messages sent by producers",
balcl::TypeInfo(&args.producer.useCompression),
balcl::OccurrenceInfo(balcl::OccurrenceInfo::e_OPTIONAL),
},
{
"log-level",
"log-level",
Expand All @@ -259,5 +266,8 @@ int main(int argc, char* argv[])

configLog(logLevel);

args.consumer.useCompression =
args.producer.useCompression; // use same compression setting for both
// producer and consumer
return rmqperftest::Runner::run(args);
}
4 changes: 4 additions & 0 deletions examples/rmqperftest/rmqperftest_args.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct ProducerArgs {
, messageSize(10)
, messageFlag("")
, numProducers(1)
, useCompression(false)
{
}

Expand All @@ -64,6 +65,7 @@ struct ProducerArgs {
int messageSize;
bsl::string messageFlag;
int numProducers;
bool useCompression;

// TODO bsl::string messageContentType;
// TODO double producerRandomStartDelay = 0.0;
Expand All @@ -77,6 +79,7 @@ struct ConsumerArgs {
, consumerRateLimit(0)
, numConsumers(1)
, consumerArgs("")
, useCompression(false)
{
}

Expand All @@ -86,6 +89,7 @@ struct ConsumerArgs {
int numConsumers;
bsl::string
consumerArgs; // expected comma separated "key=value, key=value, ..."
bool useCompression;

// TODO bool consumerSlowStart = false;
// TODO int consumerLatencyUSec = 0;
Expand Down
67 changes: 63 additions & 4 deletions examples/rmqperftest/rmqperftest_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
#include <rmqperftest_consumerargs.h>
#include <rmqperftest_runner.h>

#include <rmqa_compressiontransformer.h>
#include <rmqa_connectionstring.h>
#include <rmqa_consumer.h>
#include <rmqa_producer.h>
#include <rmqa_rabbitcontext.h>
#include <rmqa_vhost.h>
#include <rmqt_exchange.h>
#include <rmqt_properties.h>
#include <rmqt_vhostinfo.h>

#include <bsl_algorithm.h>
Expand Down Expand Up @@ -112,6 +114,42 @@ void finalizeProducers(bsl::vector<ProducerRoutingKeyPair>& producers)
producers.clear();
}

// Generate a message payload of the specified size
// The payload should be compressible, but not completely trivial (like all
// 'a's)
bsl::shared_ptr<bsl::vector<uint8_t> > createMessagePayload(size_t size)
{
bsl::shared_ptr<bsl::vector<uint8_t> > vec =
bsl::make_shared<bsl::vector<uint8_t> >(size, 'a');
bsl::vector<uint8_t>* payload = vec.get();
size_t idx = 0;
for (int i = 0; i < 26; i++) {
for (int j = 0; j < 26; j++) {
for (int k = 0; k < 26; k++) {
for (int l = 0; l < 26; l++) {
payload->at(idx++) = 'a' + i;
if (idx >= size) {
return vec;
}
payload->at(idx++) = 'a' + i;
if (idx >= size) {
return vec;
}
payload->at(idx++) = 'a' + k;
if (idx >= size) {
return vec;
}
payload->at(idx++) = 'a' + l;
if (idx >= size) {
return vec;
}
}
}
}
}
return vec;
}

} // namespace

int Runner::run(const PerfTestArgs& args)
Expand Down Expand Up @@ -208,6 +246,15 @@ int Runner::run(const PerfTestArgs& args)
bsl::cerr << "Failed to create producer: " << i << "\n";
return 1;
}
if (args.producer.useCompression) {
rmqt::Result<rmqp::MessageTransformer> transformer =
rmqa::CompressionTransformer::create();
if (!transformer) {
bsl::cerr << "Failed to create compression transformer\n";
return 1;
}
producerResult.value()->addTransformer(transformer.value());
}

producers.push_back(bsl::make_pair(
producerResult.value(),
Expand Down Expand Up @@ -244,6 +291,15 @@ int Runner::run(const PerfTestArgs& args)
<< bsl::endl;
consumerConfig.setConsumerPriority(priority.value());
}
if (args.consumer.useCompression) {
rmqt::Result<rmqp::MessageTransformer> transformer =
rmqa::CompressionTransformer::create();
if (!transformer) {
bsl::cerr << "Failed to create compression transformer\n";
return 1;
}
consumerConfig.addTransformer(transformer.value());
}

bsl::shared_ptr<bsls::AtomicBool> consumerFinishedFlag =
bsl::make_shared<bsls::AtomicBool>(false);
Expand Down Expand Up @@ -271,6 +327,9 @@ int Runner::run(const PerfTestArgs& args)
}
}

bsl::shared_ptr<bsl::vector<uint8_t> > rawPayload =
createMessagePayload(args.producer.messageSize);

const bsls::TimeInterval startTime = bsls::SystemTime::nowMonotonicClock();

bsls::TimeInterval nextPublish =
Expand Down Expand Up @@ -306,8 +365,7 @@ int Runner::run(const PerfTestArgs& args)
bsls::TimeInterval(args.producer.publishingInterval, 0);

if (producers.size() > 0) {
rmqt::Message msg(bsl::make_shared<bsl::vector<uint8_t> >(
args.producer.messageSize, 'a'));
rmqt::Message msg(rawPayload);

if (args.producer.messageFlag == "persistent") {
msg.updateDeliveryMode(rmqt::DeliveryMode::PERSISTENT);
Expand All @@ -325,8 +383,9 @@ int Runner::run(const PerfTestArgs& args)

producerMessagesSent += 1;

if ((int)producerMessagesSent >=
args.producer.producerMessageCount) {
if (args.producer.producerMessageCount >= 0 &&
static_cast<int>(producerMessagesSent) >=
args.producer.producerMessageCount) {

finalizeProducers(producers);
}
Expand Down
9 changes: 3 additions & 6 deletions src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bsl_vector.h>
#include <bsls_assert.h>

namespace BloombergLP {
namespace rmqa {
Expand Down Expand Up @@ -84,9 +85,7 @@ rmqt::Result<bool> CompressionTransformerImpl::transform(
}

// Update headers
if (!props.headers) {
props.headers = bsl::make_shared<rmqt::FieldTable>();
}
BSLS_ASSERT(props.headers);
props.headers->emplace("sdk.transform.compression.alg",
rmqt::FieldValue(bsl::string("zstd")));
props.headers->emplace(
Expand Down Expand Up @@ -126,9 +125,7 @@ rmqt::Result<> CompressionTransformerImpl::inverseTransform(
bsl::shared_ptr<bsl::vector<uint8_t> >& data,
rmqt::Properties& props)
{
if (!props.headers) {
return rmqt::Result<>("Malformed message");
}
BSLS_ASSERT(props.headers);
int64_t originalSize =
(*props.headers)["sdk.transform.compression.size"].the<int64_t>();
if (originalSize <= 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/rmq/rmqa/rmqa_consumerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ bool ConsumerImpl::unpackTransformations(rmqt::Message& dstMessage,
}
rmqt::Result<> r = (*it)->inverseTransform(rawData, properties);
if (!r) {
BALL_LOG_ERROR << "Inverse transformation failed: " << r.error();
BALL_LOG_ERROR << "Inverse transformation " << (*it)->name()
<< " failed: " << r.error();
return false;
}
properties.headers->erase(headerName); // Remove transformation header
Expand Down
11 changes: 8 additions & 3 deletions src/rmq/rmqa/rmqa_producerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <rmqio_eventloop.h>
#include <rmqp_messagetransformer.h>
#include <rmqt_confirmresponse.h>
#include <rmqt_fieldvalue.h>
#include <rmqt_future.h>
#include <rmqt_message.h>
#include <rmqt_topologyupdate.h>
Expand Down Expand Up @@ -206,6 +207,9 @@ bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage,
srcMessage.payload(),
srcMessage.payload() + srcMessage.payloadSize());
rmqt::Properties props = srcMessage.properties();
if (!props.headers) {
props.headers = bsl::make_shared<rmqt::FieldTable>();
}

// Apply all transformations
for (bsl::vector<bsl::shared_ptr<rmqp::MessageTransformer> >::iterator it =
Expand All @@ -214,21 +218,22 @@ bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage,
++it) {
rmqt::Result<bool> r = (*it)->transform(rawData, props);
if (!r) {
BALL_LOG_ERROR << "Transformation failed";
BALL_LOG_ERROR << "Transformation " << (*it)->name()
<< " failed: " << r.error();
return false;
}
else if (*r.value()) {
if (!props.headers
->emplace("sdk.transform." + (*it)->name(),
rmqt::FieldValue(bsl::string("ok")))
.second) {
BALL_LOG_ERROR << "Reserved header 'sdk.transform."
BALL_LOG_ERROR << "Transformation header 'sdk.transform."
<< (*it)->name() << "' already exists";
return false;
}
}
else {
BALL_LOG_DEBUG << "Transformation ignored";
BALL_LOG_DEBUG << "Transformation " << (*it)->name() << " ignored";
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/tests/rmqa/rmqa_messagetransformer.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <rmqa_compressiontransformer.h>
#include <rmqp_messagetransformer.h>
#include <rmqt_fieldvalue.h>
#include <rmqt_message.h>
#include <rmqt_result.h>

Expand Down Expand Up @@ -42,7 +43,10 @@ class TransformerTester {
bsl::shared_ptr<bsl::vector<uint8_t> > data =
bsl::make_shared<bsl::vector<uint8_t> >(
message.payload(), message.payload() + message.payloadSize());
rmqt::Properties props = message.properties();
rmqt::Properties props = message.properties();
if (!props.headers) {
props.headers = bsl::make_shared<rmqt::FieldTable>();
}
rmqt::Result<bool> result = transformer.transform(data, props);
if (expectedResult == FAILURE) {
EXPECT_FALSE(result); // Expect failure
Expand Down Expand Up @@ -147,6 +151,7 @@ TEST(MessageBuilderTests, ChainedCompressionIsValid)
bsl::shared_ptr<bsl::vector<uint8_t> > data =
bsl::make_shared<bsl::vector<uint8_t> >(s.cbegin(), s.cend());
rmqt::Properties props = rmqt::Message(data).properties();
props.headers = bsl::make_shared<rmqt::FieldTable>();

// Build message
BasicMessageTransform t1;
Expand Down Expand Up @@ -179,6 +184,7 @@ TEST(MessageBuilderTests, ChainedTransformIsValid)
bsl::shared_ptr<bsl::vector<uint8_t> > data =
bsl::make_shared<bsl::vector<uint8_t> >(s.cbegin(), s.cend());
rmqt::Properties props = rmqt::Message(data).properties();
props.headers = bsl::make_shared<rmqt::FieldTable>();

// Build message
BasicMessageTransform t1;
Expand Down
Loading