Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ public boolean isFinishedResponseHeaders() {
return finishedResponseHeaders;
}

void clearResponseHeadersForBlocking() {
responseHeaders.clear();
finishedResponseHeaders = false;
}

Map<String, List<String>> getResponseHeaders() {
return responseHeaders;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,11 @@ private Flow<Void> onResponseHeaderDone(RequestContext ctx_) {
return NoopFlow.INSTANCE;
}
ctx.finishResponseHeaders();
return maybePublishResponseData(ctx);
Flow<Void> flow = maybePublishResponseData(ctx);
if (flow.getAction() instanceof Flow.Action.RequestBlockingAction) {
ctx.clearResponseHeadersForBlocking();
}
return flow;
}

private void onResponseHeader(RequestContext ctx_, String name, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,37 @@ class AppSecRequestContextSpecification extends DDSpecification {
thrown(IllegalStateException)
}

void 'clearResponseHeadersForBlocking clears response headers and resets finished flag'() {
given:
ctx.addResponseHeader('content-type', 'text/html')
ctx.finishResponseHeaders()

expect:
!ctx.responseHeaders.isEmpty()
ctx.isFinishedResponseHeaders()

when:
ctx.clearResponseHeadersForBlocking()

then:
ctx.responseHeaders.isEmpty()
!ctx.isFinishedResponseHeaders()
}

void 'after clearResponseHeadersForBlocking new response headers can be added'() {
given:
ctx.addResponseHeader('content-type', 'text/html')
ctx.finishResponseHeaders()
ctx.clearResponseHeadersForBlocking()

when:
ctx.addResponseHeader('content-type', 'application/json')

then:
ctx.responseHeaders == ['content-type': ['application/json']]
notThrown(IllegalStateException)
}

void 'setting uri a second time is ignored, first value wins'() {
when:
ctx.rawURI = '/a'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import datadog.trace.api.appsec.MediaType
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.function.TriConsumer
import datadog.trace.api.function.TriFunction
import datadog.appsec.api.blocking.BlockingContentType
import datadog.trace.api.gateway.BlockResponseFunction
import datadog.trace.api.gateway.Flow
import datadog.trace.api.gateway.IGSpanInfo
Expand Down Expand Up @@ -225,6 +226,49 @@ class GatewayBridgeSpecification extends DDSpecification {
1 * traceSegment.setTagTop('actor.ip', '8.8.8.8')
}

void 'request_end writes response headers even when no appsec events'() {
AppSecRequestContext mockAppSecCtx = Mock(AppSecRequestContext)
mockAppSecCtx.requestHeaders >> [:]
mockAppSecCtx.responseHeaders >> ['content-type': ['text/plain']]
RequestContext mockCtx = Stub(RequestContext) {
getData(RequestContextSlot.APPSEC) >> mockAppSecCtx
getTraceSegment() >> traceSegment
}
IGSpanInfo spanInfo = Mock(AgentSpan)

when:
def flow = requestEndedCB.apply(mockCtx, spanInfo)

then:
1 * spanInfo.getTags() >> TagMap.fromMap([:])
1 * mockAppSecCtx.transferCollectedEvents() >> []
1 * mockAppSecCtx.close()
1 * traceSegment.setTagTop("_dd.appsec.enabled", 1)
1 * traceSegment.setTagTop("_dd.runtime_family", "jvm")
1 * traceSegment.setTagTop('http.response.headers.content-type', 'text/plain')
1 * wafMetricCollector.wafRequest(_, _, _, _, _, _, _)
flow.result == null
flow.action == Flow.Action.Noop.INSTANCE
}

void 'response_header_done clears response headers for blocking when WAF blocks'() {
given:
def blockingFlow = Stub(Flow) {
getAction() >> new Flow.Action.RequestBlockingAction(403, BlockingContentType.AUTO)
}
eventDispatcher.getDataSubscribers(_) >> nonEmptyDsInfo
eventDispatcher.publishDataEvent(nonEmptyDsInfo, ctx.data, _ as DataBundle, _ as GatewayContext) >> blockingFlow

when:
respHeaderCB.accept(ctx, 'content-type', 'text/html')
responseStartedCB.apply(ctx, 403)
respHeadersDoneCB.apply(ctx)

then:
ctx.data.responseHeaders.isEmpty()
!ctx.data.finishedResponseHeaders
}

void 'bridge can collect headers'() {
when:
reqHeaderCB.accept(ctx, 'header1', 'value 1.1')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest<AkkaHttp
tags['response.header.content-type'] != null
tags['response.header.content-length'] == SUCCESS.body.length() as String
}

def 'blocking response sets http.response.headers.content-type span tag'() {
setup:
org.junit.jupiter.api.Assumptions.assumeTrue(testBlocking())

def request = request(SUCCESS, 'GET', null)
.addHeader(IG_BLOCK_HEADER, 'auto')
.build()
client.newCall(request).execute()
TEST_WRITER.waitForTraces(1)

expect:
def rootSpan = TEST_WRITER.get(0).find { it.parentId == 0 }
rootSpan != null
rootSpan.tags['http.response.headers.content-type'] != null
}
}

abstract class AkkaHttpServerInstrumentationSyncTest extends AkkaHttpServerInstrumentationTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import akka.stream.Materializer;
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.api.gateway.Flow;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.akkahttp.appsec.BlockingResponseHelper;
import scala.Function1;
Expand All @@ -35,10 +34,9 @@ public Future<HttpResponse> apply(final HttpRequest request) {
Future<HttpResponse> futureResponse;

// handle blocking in the beginning of the request
Flow.Action.RequestBlockingAction rba;
if ((rba = span.getRequestBlockingAction()) != null) {
if (span.getRequestBlockingAction() != null) {
request.discardEntityBytes(materializer);
HttpResponse response = BlockingResponseHelper.maybeCreateBlockingResponse(rba, request);
HttpResponse response = BlockingResponseHelper.maybeCreateBlockingResponse(span, request);
span.getRequestContext().getTraceSegment().effectivelyBlocked();
DatadogWrapperHelper.finishSpan(context, response);
return FastFuture$.MODULE$.<HttpResponse>successful().apply(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.model.ContentType;
import akka.http.scaladsl.model.ContentTypes;
import akka.http.scaladsl.model.HttpEntity;
import akka.http.scaladsl.model.HttpEntity$;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
Expand Down Expand Up @@ -32,6 +34,12 @@ public static HttpResponse handleFinishForWaf(final AgentSpan span, final HttpRe
HttpResponse altResponse = ((AkkaBlockResponseFunction) brf).maybeCreateAlternativeResponse();
if (altResponse != null) {
// we already blocked during the request
DECORATE.callIGCallbackResponseAndHeaders(
span,
altResponse,
altResponse.status().intValue(),
AkkaHttpServerHeaders.responseGetter());
writeBlockingResponseHeaderTags(span, altResponse);
return altResponse;
}
}
Expand All @@ -55,7 +63,13 @@ public static HttpResponse handleFinishForWaf(final AgentSpan span, final HttpRe
}

public static HttpResponse maybeCreateBlockingResponse(AgentSpan span, HttpRequest request) {
return maybeCreateBlockingResponse(span.getRequestBlockingAction(), request);
HttpResponse response = maybeCreateBlockingResponse(span.getRequestBlockingAction(), request);
if (response != null) {
DECORATE.callIGCallbackResponseAndHeaders(
span, response, response.status().intValue(), AkkaHttpServerHeaders.responseGetter());
writeBlockingResponseHeaderTags(span, response);
}
return response;
}

public static HttpResponse maybeCreateBlockingResponse(
Expand Down Expand Up @@ -100,4 +114,21 @@ public static HttpResponse maybeCreateBlockingResponse(
}
return HttpResponse.apply(code, headersList, entity, request.protocol());
}

private static void writeBlockingResponseHeaderTags(AgentSpan span, HttpResponse response) {
ResponseEntity entity = response.entity();
if (entity instanceof HttpEntity.Strict) {
HttpEntity.Strict strictEntity = (HttpEntity.Strict) entity;
ContentType contentType = strictEntity.contentType();
if (contentType != null) {
span.getRequestContext()
.getTraceSegment()
.setTagTop("http.response.headers.content-type", contentType.value());
}
span.getRequestContext()
.getTraceSegment()
.setTagTop(
"http.response.headers.content-length", Long.toString(strictEntity.contentLength()));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package datadog.trace.instrumentation.netty38.server;

import static datadog.trace.instrumentation.netty38.server.NettyHttpServerDecorator.DECORATE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;

import datadog.appsec.api.blocking.BlockingContentType;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.bootstrap.blocking.BlockingActionHelper;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.netty38.ChannelTraceContext;
import java.util.Map;
import java.util.NoSuchElementException;
import org.jboss.netty.buffer.ChannelBuffer;
Expand Down Expand Up @@ -131,6 +134,22 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
response.setContent(buf);
}

MaybeBlockResponseHandler maybeBlock =
(MaybeBlockResponseHandler) ctx.getPipeline().get(MaybeBlockResponseHandler.class);
if (maybeBlock != null) {
ChannelTraceContext ctc = maybeBlock.getContextStore().get(ctx.getChannel());
if (ctc != null) {
AgentSpan span = ctc.getServerSpan();
if (span != null) {
DECORATE.callIGCallbackResponseAndHeaders(
span, response, httpCode, ResponseExtractAdapter.GETTER);
writeBlockingResponseHeaderTags(span, response);
}
ctc.setAnalyzedResponse(true);
ctc.setBlockedResponse(true);
}
}

this.hasBlockedAlready = true;
segment.effectivelyBlocked();

Expand All @@ -147,4 +166,20 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
});
Channels.write(ctxForDownstream, future, response);
}

private static void writeBlockingResponseHeaderTags(
AgentSpan span, DefaultHttpResponse response) {
String contentType = response.headers().get("Content-type");
if (contentType != null) {
span.getRequestContext()
.getTraceSegment()
.setTagTop("http.response.headers.content-type", contentType);
}
String contentLength = response.headers().get("Content-Length");
if (contentLength != null) {
span.getRequestContext()
.getTraceSegment()
.setTagTop("http.response.headers.content-length", contentLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public MaybeBlockResponseHandler(final ContextStore<Channel, ChannelTraceContext
this.contextStore = contextStore;
}

public ContextStore<Channel, ChannelTraceContext> getContextStore() {
return contextStore;
}

@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent msg) throws Exception {
final ChannelTraceContext channelTraceContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import org.jboss.netty.logging.InternalLogLevel
import org.jboss.netty.logging.InternalLoggerFactory
import org.jboss.netty.logging.Slf4JLoggerFactory
import org.jboss.netty.util.CharsetUtil
import org.junit.jupiter.api.Assumptions
import spock.lang.Ignore

abstract class Netty38ServerTest extends HttpServerTest<ServerBootstrap> {
Expand Down Expand Up @@ -316,6 +317,22 @@ abstract class Netty38ServerTest extends HttpServerTest<ServerBootstrap> {
boolean testBadUrl() {
false
}

def 'blocking response sets http.response.headers.content-type span tag'() {
setup:
Assumptions.assumeTrue(testBlocking())

def request = request(SUCCESS, 'GET', null)
.addHeader(IG_BLOCK_HEADER, 'auto')
.build()
client.newCall(request).execute()
TEST_WRITER.waitForTraces(1)

expect:
def rootSpan = TEST_WRITER.get(0).find { it.parentId == 0 }
rootSpan != null
rootSpan.tags['http.response.headers.content-type'] != null
}
}

class Netty38ServerV0Test extends Netty38ServerTest implements TestingNettyHttpNamingConventions.ServerV0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package datadog.trace.instrumentation.netty40.server;

import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static datadog.trace.instrumentation.netty40.AttributeKeys.ANALYZED_RESPONSE_KEY;
import static datadog.trace.instrumentation.netty40.AttributeKeys.BLOCKED_RESPONSE_KEY;
import static datadog.trace.instrumentation.netty40.AttributeKeys.CONTEXT_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty40.server.NettyHttpServerDecorator.DECORATE;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;

import datadog.appsec.api.blocking.BlockingContentType;
import datadog.context.Context;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.bootstrap.blocking.BlockingActionHelper;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand Down Expand Up @@ -112,6 +119,17 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
}

this.hasBlockedAlready = true;

Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get();
AgentSpan span = spanFromContext(storedContext);
if (span != null) {
DECORATE.callIGCallbackResponseAndHeaders(
span, response, httpCode, ResponseExtractAdapter.GETTER);
writeBlockingResponseHeaderTags(span, response.headers());
}
ctx.channel().attr(ANALYZED_RESPONSE_KEY).set(Boolean.TRUE);
ctx.channel().attr(BLOCKED_RESPONSE_KEY).set(Boolean.TRUE);

ReferenceCountUtil.release(msg);

// write starts in the handler before the one associated with ctx
Expand Down Expand Up @@ -142,6 +160,21 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
});
}

private static void writeBlockingResponseHeaderTags(AgentSpan span, HttpHeaders headers) {
String contentType = headers.get("Content-type");
if (contentType != null) {
span.getRequestContext()
.getTraceSegment()
.setTagTop("http.response.headers.content-type", contentType);
}
String contentLength = headers.get("Content-Length");
if (contentLength != null) {
span.getRequestContext()
.getTraceSegment()
.setTagTop("http.response.headers.content-length", contentLength);
}
}

@ChannelHandler.Sharable
public static class IgnoreAllWritesHandler extends ChannelOutboundHandlerAdapter {
public static final IgnoreAllWritesHandler INSTANCE = new IgnoreAllWritesHandler();
Expand Down
Loading