diff --git a/examples/rmqperftest/rmqperftest.m.cpp b/examples/rmqperftest/rmqperftest.m.cpp index 8a8e450d..83222dc2 100644 --- a/examples/rmqperftest/rmqperftest.m.cpp +++ b/examples/rmqperftest/rmqperftest.m.cpp @@ -241,6 +241,13 @@ int main(int argc, char* argv[]) balcl::TypeInfo(&args.producer.messageFlag), balcl::OccurrenceInfo(args.producer.messageFlag), }, + { + "use-compression", + "use-compression", + "Enable compression for messages sent by producers", + balcl::TypeInfo(&args.producer.useCompression), + balcl::OccurrenceInfo(balcl::OccurrenceInfo::e_OPTIONAL), + }, { "log-level", "log-level", @@ -259,5 +266,8 @@ int main(int argc, char* argv[]) configLog(logLevel); + args.consumer.useCompression = + args.producer.useCompression; // use same compression setting for both + // producer and consumer return rmqperftest::Runner::run(args); } diff --git a/examples/rmqperftest/rmqperftest_args.h b/examples/rmqperftest/rmqperftest_args.h index ef590ce9..130e1995 100644 --- a/examples/rmqperftest/rmqperftest_args.h +++ b/examples/rmqperftest/rmqperftest_args.h @@ -54,6 +54,7 @@ struct ProducerArgs { , messageSize(10) , messageFlag("") , numProducers(1) + , useCompression(false) { } @@ -64,6 +65,7 @@ struct ProducerArgs { int messageSize; bsl::string messageFlag; int numProducers; + bool useCompression; // TODO bsl::string messageContentType; // TODO double producerRandomStartDelay = 0.0; @@ -77,6 +79,7 @@ struct ConsumerArgs { , consumerRateLimit(0) , numConsumers(1) , consumerArgs("") + , useCompression(false) { } @@ -86,6 +89,7 @@ struct ConsumerArgs { int numConsumers; bsl::string consumerArgs; // expected comma separated "key=value, key=value, ..." + bool useCompression; // TODO bool consumerSlowStart = false; // TODO int consumerLatencyUSec = 0; diff --git a/examples/rmqperftest/rmqperftest_runner.cpp b/examples/rmqperftest/rmqperftest_runner.cpp index 951e92f0..1b0110bd 100644 --- a/examples/rmqperftest/rmqperftest_runner.cpp +++ b/examples/rmqperftest/rmqperftest_runner.cpp @@ -16,12 +16,14 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -112,6 +114,42 @@ void finalizeProducers(bsl::vector& producers) producers.clear(); } +// Generate a message payload of the specified size +// The payload should be compressible, but not completely trivial (like all +// 'a's) +bsl::shared_ptr > createMessagePayload(size_t size) +{ + bsl::shared_ptr > vec = + bsl::make_shared >(size, 'a'); + bsl::vector* payload = vec.get(); + size_t idx = 0; + for (int i = 0; i < 26; i++) { + for (int j = 0; j < 26; j++) { + for (int k = 0; k < 26; k++) { + for (int l = 0; l < 26; l++) { + payload->at(idx++) = 'a' + i; + if (idx >= size) { + return vec; + } + payload->at(idx++) = 'a' + i; + if (idx >= size) { + return vec; + } + payload->at(idx++) = 'a' + k; + if (idx >= size) { + return vec; + } + payload->at(idx++) = 'a' + l; + if (idx >= size) { + return vec; + } + } + } + } + } + return vec; +} + } // namespace int Runner::run(const PerfTestArgs& args) @@ -208,6 +246,15 @@ int Runner::run(const PerfTestArgs& args) bsl::cerr << "Failed to create producer: " << i << "\n"; return 1; } + if (args.producer.useCompression) { + rmqt::Result transformer = + rmqa::CompressionTransformer::create(); + if (!transformer) { + bsl::cerr << "Failed to create compression transformer\n"; + return 1; + } + producerResult.value()->addTransformer(transformer.value()); + } producers.push_back(bsl::make_pair( producerResult.value(), @@ -244,6 +291,15 @@ int Runner::run(const PerfTestArgs& args) << bsl::endl; consumerConfig.setConsumerPriority(priority.value()); } + if (args.consumer.useCompression) { + rmqt::Result transformer = + rmqa::CompressionTransformer::create(); + if (!transformer) { + bsl::cerr << "Failed to create compression transformer\n"; + return 1; + } + consumerConfig.addTransformer(transformer.value()); + } bsl::shared_ptr consumerFinishedFlag = bsl::make_shared(false); @@ -271,6 +327,9 @@ int Runner::run(const PerfTestArgs& args) } } + bsl::shared_ptr > rawPayload = + createMessagePayload(args.producer.messageSize); + const bsls::TimeInterval startTime = bsls::SystemTime::nowMonotonicClock(); bsls::TimeInterval nextPublish = @@ -306,8 +365,7 @@ int Runner::run(const PerfTestArgs& args) bsls::TimeInterval(args.producer.publishingInterval, 0); if (producers.size() > 0) { - rmqt::Message msg(bsl::make_shared >( - args.producer.messageSize, 'a')); + rmqt::Message msg(rawPayload); if (args.producer.messageFlag == "persistent") { msg.updateDeliveryMode(rmqt::DeliveryMode::PERSISTENT); @@ -325,8 +383,9 @@ int Runner::run(const PerfTestArgs& args) producerMessagesSent += 1; - if ((int)producerMessagesSent >= - args.producer.producerMessageCount) { + if (args.producer.producerMessageCount >= 0 && + static_cast(producerMessagesSent) >= + args.producer.producerMessageCount) { finalizeProducers(producers); } diff --git a/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp b/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp index f8953e18..a0b837a8 100644 --- a/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp +++ b/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp @@ -29,6 +29,7 @@ #include #include #include +#include namespace BloombergLP { namespace rmqa { @@ -84,9 +85,7 @@ rmqt::Result CompressionTransformerImpl::transform( } // Update headers - if (!props.headers) { - props.headers = bsl::make_shared(); - } + BSLS_ASSERT(props.headers); props.headers->emplace("sdk.transform.compression.alg", rmqt::FieldValue(bsl::string("zstd"))); props.headers->emplace( @@ -126,9 +125,7 @@ rmqt::Result<> CompressionTransformerImpl::inverseTransform( bsl::shared_ptr >& data, rmqt::Properties& props) { - if (!props.headers) { - return rmqt::Result<>("Malformed message"); - } + BSLS_ASSERT(props.headers); int64_t originalSize = (*props.headers)["sdk.transform.compression.size"].the(); if (originalSize <= 0) { diff --git a/src/rmq/rmqa/rmqa_consumerimpl.cpp b/src/rmq/rmqa/rmqa_consumerimpl.cpp index 6ccb7e89..afe082d6 100644 --- a/src/rmq/rmqa/rmqa_consumerimpl.cpp +++ b/src/rmq/rmqa/rmqa_consumerimpl.cpp @@ -215,7 +215,8 @@ bool ConsumerImpl::unpackTransformations(rmqt::Message& dstMessage, } rmqt::Result<> r = (*it)->inverseTransform(rawData, properties); if (!r) { - BALL_LOG_ERROR << "Inverse transformation failed: " << r.error(); + BALL_LOG_ERROR << "Inverse transformation " << (*it)->name() + << " failed: " << r.error(); return false; } properties.headers->erase(headerName); // Remove transformation header diff --git a/src/rmq/rmqa/rmqa_producerimpl.cpp b/src/rmq/rmqa/rmqa_producerimpl.cpp index 878a888b..b3a1784f 100644 --- a/src/rmq/rmqa/rmqa_producerimpl.cpp +++ b/src/rmq/rmqa/rmqa_producerimpl.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -206,6 +207,9 @@ bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage, srcMessage.payload(), srcMessage.payload() + srcMessage.payloadSize()); rmqt::Properties props = srcMessage.properties(); + if (!props.headers) { + props.headers = bsl::make_shared(); + } // Apply all transformations for (bsl::vector >::iterator it = @@ -214,7 +218,8 @@ bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage, ++it) { rmqt::Result r = (*it)->transform(rawData, props); if (!r) { - BALL_LOG_ERROR << "Transformation failed"; + BALL_LOG_ERROR << "Transformation " << (*it)->name() + << " failed: " << r.error(); return false; } else if (*r.value()) { @@ -222,13 +227,13 @@ bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage, ->emplace("sdk.transform." + (*it)->name(), rmqt::FieldValue(bsl::string("ok"))) .second) { - BALL_LOG_ERROR << "Reserved header 'sdk.transform." + BALL_LOG_ERROR << "Transformation header 'sdk.transform." << (*it)->name() << "' already exists"; return false; } } else { - BALL_LOG_DEBUG << "Transformation ignored"; + BALL_LOG_DEBUG << "Transformation " << (*it)->name() << " ignored"; } } diff --git a/src/tests/rmqa/rmqa_messagetransformer.t.cpp b/src/tests/rmqa/rmqa_messagetransformer.t.cpp index ee31847b..ccafadd1 100644 --- a/src/tests/rmqa/rmqa_messagetransformer.t.cpp +++ b/src/tests/rmqa/rmqa_messagetransformer.t.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -42,7 +43,10 @@ class TransformerTester { bsl::shared_ptr > data = bsl::make_shared >( message.payload(), message.payload() + message.payloadSize()); - rmqt::Properties props = message.properties(); + rmqt::Properties props = message.properties(); + if (!props.headers) { + props.headers = bsl::make_shared(); + } rmqt::Result result = transformer.transform(data, props); if (expectedResult == FAILURE) { EXPECT_FALSE(result); // Expect failure @@ -147,6 +151,7 @@ TEST(MessageBuilderTests, ChainedCompressionIsValid) bsl::shared_ptr > data = bsl::make_shared >(s.cbegin(), s.cend()); rmqt::Properties props = rmqt::Message(data).properties(); + props.headers = bsl::make_shared(); // Build message BasicMessageTransform t1; @@ -179,6 +184,7 @@ TEST(MessageBuilderTests, ChainedTransformIsValid) bsl::shared_ptr > data = bsl::make_shared >(s.cbegin(), s.cend()); rmqt::Properties props = rmqt::Message(data).properties(); + props.headers = bsl::make_shared(); // Build message BasicMessageTransform t1;