From 42b15b2eaf3a8cf95132284d8e1ddd3e0d86d981 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 3 Mar 2026 15:40:18 -0500 Subject: [PATCH] Fix #1189 - Add correlate to the DSL and refactor listen/emit tasks Signed-off-by: Ricardo Zanini --- .../fluent/func/FuncEventFilterBuilder.java | 7 +- .../func/FuncEventPropertiesBuilder.java | 7 + ...ava => FuncEventPropertiesConfigurer.java} | 2 +- .../fluent/func/dsl/BaseFuncListenSpec.java | 5 +- .../fluent/func/dsl/FuncDSL.java | 203 ++++++++++-------- .../fluent/func/dsl/FuncEmitSpec.java | 57 ++++- .../fluent/func/dsl/FuncEventFilterSpec.java | 61 ++---- .../func/dsl/internal/CommonFuncOps.java | 9 +- .../fluent/func/FuncDSLTest.java | 38 ++-- .../fluent/func/FuncDSLUniqueIdTest.java | 9 +- ...er.java => EventPropertiesConfigurer.java} | 2 +- .../spec/dsl/AbstractEventFilterSpec.java | 59 +++++ .../fluent/spec/dsl/BaseListenSpec.java | 84 +++----- .../fluent/spec/dsl/DSL.java | 69 +++++- .../fluent/spec/dsl/EmitSpec.java | 6 +- .../fluent/spec/dsl/EventFilterSpec.java | 72 +------ .../fluent/spec/dsl/EventPropertiesSpec.java | 84 ++++++++ .../fluent/spec/dsl/EventSpec.java | 32 --- ...Spec.java => ExprEventPropertiesSpec.java} | 16 +- .../fluent/spec/dsl/ListenSpec.java | 10 +- .../fluent/spec/WorkflowBuilderTest.java | 40 ++-- .../fluent/spec/dsl/DSLTest.java | 3 +- .../fluent/spec/dsl/TryCatchDslTest.java | 7 +- 23 files changed, 495 insertions(+), 387 deletions(-) rename experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/{FuncEventConfigurer.java => FuncEventPropertiesConfigurer.java} (89%) rename fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/{EventConfigurer.java => EventPropertiesConfigurer.java} (90%) create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java delete mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java rename fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/{ExprEventFilterSpec.java => ExprEventPropertiesSpec.java} (75%) diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java index cac572acd..ff6d9bab9 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java @@ -18,8 +18,7 @@ import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; public class FuncEventFilterBuilder - extends AbstractEventFilterBuilder< - FuncEventFilterBuilder, FuncPredicateEventPropertiesBuilder> { + extends AbstractEventFilterBuilder { @Override protected FuncEventFilterBuilder self() { @@ -27,7 +26,7 @@ protected FuncEventFilterBuilder self() { } @Override - protected FuncPredicateEventPropertiesBuilder newEventPropertiesBuilder() { - return new FuncPredicateEventPropertiesBuilder(); + protected FuncEventPropertiesBuilder newEventPropertiesBuilder() { + return new FuncEventPropertiesBuilder(); } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java index 27c691ce5..8a7eac1b5 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java @@ -17,8 +17,10 @@ import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.api.types.func.EventDataPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.util.function.Function; +import java.util.function.Predicate; public class FuncEventPropertiesBuilder extends AbstractEventPropertiesBuilder { @@ -37,4 +39,9 @@ public FuncEventPropertiesBuilder data(Function function, this.eventProperties.setData(new EventDataFunction().withFunction(function, clazz)); return this; } + + public FuncEventPropertiesBuilder data(Predicate predicate) { + this.eventProperties.setData(new EventDataPredicate().withPredicate(predicate)); + return this; + } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventPropertiesConfigurer.java similarity index 89% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventPropertiesConfigurer.java index aac390748..67eaa48af 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventPropertiesConfigurer.java @@ -19,4 +19,4 @@ import java.util.function.Consumer; @FunctionalInterface -public interface FuncEventConfigurer extends Consumer {} +public interface FuncEventPropertiesConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java index 6c7f2d110..5715ddfe0 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java @@ -17,20 +17,17 @@ import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; import io.serverlessworkflow.fluent.func.FuncListenToBuilder; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.spec.dsl.BaseListenSpec; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Predicate; public abstract class BaseFuncListenSpec - extends BaseListenSpec< - SELF, LB, FuncListenToBuilder, FuncEventFilterBuilder, FuncPredicateEventConfigurer> { + extends BaseListenSpec { protected BaseFuncListenSpec(ToInvoker toInvoker) { super( toInvoker, - FuncEventFilterBuilder::with, // allApplier (tb, filters) -> tb.all(castFilters(filters)), // anyApplier diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index ff2db37e0..030fb6805 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -25,7 +25,6 @@ import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import io.serverlessworkflow.fluent.func.configurers.FuncCallHttpConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallOpenAPIConfigurer; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps; @@ -59,13 +58,13 @@ * *
{@code
  * Workflow wf = FuncWorkflowBuilder.workflow("example")
- *   .tasks(
- *     FuncDSL.function(String::trim, String.class),
- *     FuncDSL.emitJson("org.acme.started", MyPayload.class),
- *     FuncDSL.listen(FuncDSL.toAny("type.one", "type.two"))
- *       .outputAs(map -> map.get("value")),
- *     FuncDSL.switchWhenOrElse((Integer v) -> v > 0, "positive", FlowDirectiveEnum.END, Integer.class)
- *   ).build();
+ * .tasks(
+ * FuncDSL.function(String::trim, String.class),
+ * FuncDSL.emit(FuncDSL.produced("org.acme.started").jsonData(MyPayload.class)),
+ * FuncDSL.listen(FuncDSL.toAny("type.one", "type.two"))
+ * .outputAs(map -> map.get("value")),
+ * FuncDSL.switchWhenOrElse((Integer v) -> v > 0, "positive", FlowDirectiveEnum.END, Integer.class)
+ * ).build();
  * }
*/ public final class FuncDSL { @@ -172,7 +171,7 @@ public static FuncListenSpec to() { * @return a {@link FuncListenSpec} set to {@code one(type)} */ public static FuncListenSpec toOne(String type) { - return new FuncListenSpec().one(e -> e.type(type)); + return new FuncListenSpec().one(consumed(type)); } /** @@ -182,9 +181,9 @@ public static FuncListenSpec toOne(String type) { * @return a {@link FuncListenSpec} set to {@code all(types...)} */ public static FuncListenSpec toAll(String... types) { - FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + FuncEventFilterSpec[] events = new FuncEventFilterSpec[types.length]; for (int i = 0; i < types.length; i++) { - events[i] = event(types[i]); + events[i] = consumed(types[i]); } return new FuncListenSpec().all(events); } @@ -196,9 +195,9 @@ public static FuncListenSpec toAll(String... types) { * @return a {@link FuncListenSpec} set to {@code any(types...)} */ public static FuncListenSpec toAny(String... types) { - FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + FuncEventFilterSpec[] events = new FuncEventFilterSpec[types.length]; for (int i = 0; i < types.length; i++) { - events[i] = event(types[i]); + events[i] = consumed(types[i]); } return new FuncListenSpec().any(events); } @@ -212,13 +211,14 @@ public static FuncListenSpec toAny(String... types) { * @param input type to the function * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer event( + public static Consumer produced( String type, Function function) { - return OPS.event(type, function); + return OPS.emit(type, function); } /** - * Same as {@link #event(String, Function)} but with an explicit input class to guide conversion. + * Same as {@link #produced(String, Function)} but with an explicit input class to guide + * conversion. * * @param type CloudEvent type * @param function function that maps workflow input to {@link CloudEventData} @@ -226,9 +226,9 @@ public static Consumer event( * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer event( + public static Consumer produced( String type, Function function, Class clazz) { - return OPS.event(type, function, clazz); + return OPS.emit(type, function, clazz); } /** @@ -240,7 +240,7 @@ public static Consumer event( * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventJson(String type, Class clazz) { + public static Consumer producedJson(String type, Class clazz) { return b -> new FuncEmitSpec().type(type).jsonData(clazz).accept(b); } @@ -253,7 +253,7 @@ public static Consumer eventJson(String type, Class * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventBytes( + public static Consumer producedBytes( String type, Function serializer, Class clazz) { return b -> new FuncEmitSpec().type(type).bytesData(serializer, clazz).accept(b); } @@ -265,18 +265,32 @@ public static Consumer eventBytes( * @param type CloudEvent type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventBytesUtf8(String type) { + public static Consumer producedBytesUtf8(String type) { return b -> new FuncEmitSpec().type(type).bytesDataUtf8().accept(b); } /** - * Create a predicate event configurer for {@code listen} specs. + * Starts building an event emission specification with a predefined type. * - * @param type CloudEvent type - * @return predicate event configurer for use in {@link FuncListenSpec} + * @param type CloudEvent type to be emitted + * @return a new {@link FuncEmitSpec} instance pre-configured with the event type + */ + public static FuncEmitSpec produced(String type) { + return new FuncEmitSpec().type(type); + } + + /** + * Starts building a function-centric event filter specification for a specific CloudEvent type. * + * + *

This creates an empty {@link FuncEventFilterSpec} which acts as a fluent builder for + * matching incoming CloudEvents. It is typically passed to a {@code listen} strategy like {@link + * #toOne(String)} or {@code to().any(...)}. + * + * @param type the {@code type} attribute of the CloudEvent to listen for + * @return a new {@link FuncEventFilterSpec} instance pre-configured with the event type */ - public static FuncPredicateEventConfigurer event(String type) { - return OPS.event(type); + public static FuncEventFilterSpec consumed(String type) { + return new FuncEventFilterSpec().type(type); } /** @@ -553,6 +567,19 @@ public static Consumer tasks(FuncTaskConfigurer... step return list -> snapshot.forEach(s -> s.accept(list)); } + /** + * Starts building a function-centric event emission specification. + * + *

This creates an empty {@link FuncEmitSpec} which acts as a fluent builder for the properties + * (e.g., type, source, data) of the CloudEvent to be emitted. It is typically passed to the + * {@link #emit(Consumer)} step. + * + * @return a new {@link FuncEmitSpec} instance for fluent configuration + */ + public static FuncEmitSpec produced() { + return new FuncEmitSpec(); + } + /** * Create an {@code emit} step from a low-level {@link FuncEmitTaskBuilder} configurer. Prefer * higher-level helpers like {@link #emitJson(String, Class)} where possible. @@ -584,7 +611,7 @@ public static EmitStep emit(String name, Consumer cfg) { * @return an {@link EmitStep} */ public static EmitStep emit(String type, Function fn) { - return new EmitStep(null, event(type, fn)); + return new EmitStep(null, produced(type, fn)); } /** @@ -597,7 +624,7 @@ public static EmitStep emit(String type, Function fn) { * @return a named {@link EmitStep} */ public static EmitStep emit(String name, String type, Function fn) { - return new EmitStep(name, event(type, fn)); + return new EmitStep(name, produced(type, fn)); } /** @@ -612,7 +639,7 @@ public static EmitStep emit(String name, String type, Function EmitStep emit( String name, String type, Function serializer, Class clazz) { - return new EmitStep(name, eventBytes(type, serializer, clazz)); + return new EmitStep(name, producedBytes(type, serializer, clazz)); } /** @@ -625,7 +652,7 @@ public static EmitStep emit( * @return an {@link EmitStep} */ public static EmitStep emit(String type, Function serializer, Class clazz) { - return new EmitStep(null, eventBytes(type, serializer, clazz)); + return new EmitStep(null, producedBytes(type, serializer, clazz)); } /** @@ -637,7 +664,7 @@ public static EmitStep emit(String type, Function serializer, Cla * @return an {@link EmitStep} */ public static EmitStep emitJson(String type, Class clazz) { - return new EmitStep(null, eventJson(type, clazz)); + return new EmitStep(null, producedJson(type, clazz)); } /** @@ -650,7 +677,7 @@ public static EmitStep emitJson(String type, Class clazz) { * @return a named {@link EmitStep} */ public static EmitStep emitJson(String name, String type, Class clazz) { - return new EmitStep(name, eventJson(type, clazz)); + return new EmitStep(name, producedJson(type, clazz)); } /** @@ -739,7 +766,7 @@ public static FuncTaskConfigurer switchWhen( * JQ-based condition: if the JQ expression evaluates truthy → jump to {@code thenTask}. * *

-   *   switchWhen(".approved == true", "approveOrder")
+   * switchWhen(".approved == true", "approveOrder")
    * 
* *

The JQ expression is evaluated against the task input at runtime. When the predicate is @@ -792,7 +819,7 @@ public static FuncTaskConfigurer switchWhenOrElse( * follow the {@link FlowDirectiveEnum} given in {@code otherwise}. * *

-   *   switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
+   * switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
    * 
* *

The JQ expression is evaluated against the task input at runtime. @@ -818,7 +845,7 @@ public static FuncTaskConfigurer switchWhenOrElse( * to {@code otherwiseTask}. * *

-   *   switchWhenOrElse(".score >= 80", "pass", "fail")
+   * switchWhenOrElse(".score >= 80", "pass", "fail")
    * 
* *

The JQ expression is evaluated against the task input at runtime. @@ -937,11 +964,11 @@ public static FuncTaskConfigurer call(String name, FuncCallHttpConfigurer config * *

{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.http()
-   *       .GET()
-   *       .endpoint("http://service/api")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/api")
+   * )
    * );
    * }
* @@ -957,11 +984,11 @@ public static FuncTaskConfigurer call(FuncCallHttpStep spec) { * *
{@code
    * tasks(
-   *   FuncDSL.call("fetchUsers",
-   *     FuncDSL.http()
-   *       .GET()
-   *       .endpoint("http://service/users")
-   *   )
+   * FuncDSL.call("fetchUsers",
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/users")
+   * )
    * );
    * }
* @@ -981,14 +1008,14 @@ public static FuncTaskConfigurer call(String name, FuncCallHttpStep spec) { * *
{@code
    * FuncWorkflowBuilder.workflow("openapi-call")
-   *   .tasks(
-   *     FuncDSL.call(
-   *       FuncDSL.openapi()
-   *         .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *         .operation("getPetById")
-   *     )
-   *   )
-   *   .build();
+   * .tasks(
+   * FuncDSL.call(
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * )
+   * )
+   * .build();
    * }
* * @param spec fluent OpenAPI spec built via {@link #openapi()} @@ -1005,16 +1032,16 @@ public static FuncTaskConfigurer call(FuncCallOpenAPIStep spec) { * *
{@code
    * FuncWorkflowBuilder.workflow("openapi-call-named")
-   *   .tasks(
-   *     FuncDSL.call(
-   *       "fetchPet",
-   *       FuncDSL.openapi()
-   *         .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *         .operation("getPetById")
-   *         .parameter("id", 123)
-   *     )
-   *   )
-   *   .build();
+   * .tasks(
+   * FuncDSL.call(
+   * "fetchPet",
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * .parameter("id", 123)
+   * )
+   * )
+   * .build();
    * }
* * @param name task name, or {@code null} for an anonymous task @@ -1060,10 +1087,10 @@ public static FuncTaskConfigurer call(String name, FuncCallOpenAPIConfigurer con * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.openapi()
-   *     .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *     .operation("getPetById")
-   *     .parameter("id", 123)
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * .parameter("id", 123)
    * );
    * }
* @@ -1095,10 +1122,10 @@ public static FuncCallOpenAPIStep openapi(String name) { * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.http()
-   *     .GET()
-   *     .endpoint("http://service/api")
-   *     .acceptJSON()
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/api")
+   * .acceptJSON()
    * );
    * }
* @@ -1123,8 +1150,8 @@ public static FuncCallHttpStep http(String name) { * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.http("http://service/api", auth -> auth.use("my-auth"))
-   *     .GET()
+   * FuncDSL.http("http://service/api", auth -> auth.use("my-auth"))
+   * .GET()
    * );
    * }
* @@ -1152,9 +1179,9 @@ public static FuncCallHttpStep http(URI url, AuthenticationConfigurer auth) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("http://service/health")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("http://service/health")
+   * )
    * );
    * }
* @@ -1171,9 +1198,9 @@ public static FuncCallHttpStep get(String endpoint) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("checkHealth", "http://service/health")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("checkHealth", "http://service/health")
+   * )
    * );
    * }
* @@ -1191,9 +1218,9 @@ public static FuncCallHttpStep get(String name, String endpoint) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("http://service/api/users", auth -> auth.use("user-service-auth"))
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("http://service/api/users", auth -> auth.use("user-service-auth"))
+   * )
    * );
    * }
* @@ -1266,12 +1293,12 @@ public static FuncCallHttpStep get(String name, URI endpoint, AuthenticationConf * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.post(
-   *       Map.of("name", "Ricardo"),
-   *       "http://service/api/users"
-   *     )
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.post(
+   * Map.of("name", "Ricardo"),
+   * "http://service/api/users"
+   * )
+   * )
    * );
    * }
* diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java index 115419ada..f9ad1caca 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java @@ -15,18 +15,65 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.types.func.EventDataFunction; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; import io.serverlessworkflow.fluent.func.configurers.FuncEmitConfigurer; +import io.serverlessworkflow.fluent.spec.dsl.EventPropertiesSpec; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; -public class FuncEmitSpec extends FuncEventFilterSpec implements FuncEmitConfigurer { +public final class FuncEmitSpec + extends EventPropertiesSpec + implements FuncEmitConfigurer { @Override - public void accept(FuncEmitTaskBuilder funcEmitTaskBuilder) { - funcEmitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); + protected FuncEmitSpec self() { + return this; + } + + /** Sets the event data and the contentType to `application/json` */ + public FuncEmitSpec jsonData(Function function) { + Class clazz = ReflectionUtils.inferInputType(function); + addPropertyStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** Sets the event data and the contentType to `application/octet-stream` */ + public FuncEmitSpec bytesData(Function serializer, Class clazz) { + addPropertyStep( + e -> e.data(payload -> BytesCloudEventData.wrap(serializer.apply(payload)), clazz)); + return OCTET_STREAM(); + } + + public FuncEmitSpec bytesDataUtf8() { + return bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); + } + + /** Sets the event data and the contentType to `application/json` */ + public FuncEmitSpec jsonData(Function function, Class clazz) { + addPropertyStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** JSON with default mapper (PojoCloudEventData + application/json). */ + public FuncEmitSpec jsonData(Class clazz) { + addPropertyStep( + e -> + e.data( + payload -> + PojoCloudEventData.wrap( + payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), + clazz)); + return JSON(); } @Override - protected FuncEmitSpec self() { - return this; + public void accept(FuncEmitTaskBuilder funcEmitTaskBuilder) { + funcEmitTaskBuilder.event(e -> getPropertySteps().forEach(step -> step.accept(e))); } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java index 1e254467f..ce8ec47eb 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java @@ -16,55 +16,26 @@ package io.serverlessworkflow.fluent.func.dsl; import io.cloudevents.CloudEventData; -import io.cloudevents.core.data.BytesCloudEventData; -import io.cloudevents.core.data.PojoCloudEventData; -import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; -import io.serverlessworkflow.fluent.spec.dsl.EventFilterSpec; -import io.serverlessworkflow.impl.jackson.JsonUtils; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.function.Function; +import io.serverlessworkflow.fluent.spec.dsl.AbstractEventFilterSpec; +import java.util.function.Predicate; -public abstract class FuncEventFilterSpec - extends EventFilterSpec { +public final class FuncEventFilterSpec + extends AbstractEventFilterSpec< + FuncEventFilterSpec, FuncEventPropertiesBuilder, FuncEventFilterBuilder> { - FuncEventFilterSpec() { - super(new ArrayList<>()); + @Override + protected FuncEventFilterSpec self() { + return this; } - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Function function) { - Class clazz = ReflectionUtils.inferInputType(function); - addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); - return JSON(); - } - - /** Sets the event data and the contentType to `application/octet-stream` */ - public SELF bytesData(Function serializer, Class clazz) { - addStep(e -> e.data(payload -> BytesCloudEventData.wrap(serializer.apply(payload)), clazz)); - return OCTET_STREAM(); - } - - public SELF bytesDataUtf8() { - return bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); - } - - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Function function, Class clazz) { - addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); - return JSON(); - } - - /** JSON with default mapper (PojoCloudEventData + application/json). */ - public SELF jsonData(Class clazz) { - addStep( - e -> - e.data( - payload -> - PojoCloudEventData.wrap( - payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), - clazz)); - return JSON(); + /** + * Configures the filter to match incoming event data based on a Predicate. This is the Listen + * counterpart to Emit's jsonData(Function). + */ + public FuncEventFilterSpec dataMatches(Predicate predicate) { + addPropertyStep(e -> e.data(predicate)); + return this; } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java index fe0c05dd7..fb141689a 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java @@ -20,7 +20,6 @@ import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import io.serverlessworkflow.fluent.func.dsl.ReflectionUtils; import io.serverlessworkflow.fluent.func.dsl.SwitchCaseSpec; @@ -63,17 +62,13 @@ default SwitchCaseConfigurer caseDefault(FlowDirectiveEnum directive) { return s -> s.then(directive); } - default Consumer event( + default Consumer emit( String type, Function function) { return event -> event.event(e -> e.type(type).data(function)); } - default Consumer event( + default Consumer emit( String type, Function function, Class clazz) { return event -> event.event(e -> e.type(type).data(function, clazz)); } - - default FuncPredicateEventConfigurer event(String type) { - return e -> e.type(type); - } } diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java index 3dbe95094..3a3e4a4b1 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java @@ -17,20 +17,19 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.call; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emit; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.event; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.get; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.produced; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; -import static io.serverlessworkflow.fluent.spec.dsl.DSL.auth; import static io.serverlessworkflow.fluent.spec.dsl.DSL.use; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import io.cloudevents.core.data.BytesCloudEventData; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirectiveEnum; @@ -40,10 +39,7 @@ import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.fluent.func.dsl.FuncDSL; -import io.serverlessworkflow.fluent.func.dsl.FuncEmitSpec; -import io.serverlessworkflow.fluent.func.dsl.FuncListenSpec; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import org.junit.jupiter.api.DisplayName; @@ -92,19 +88,15 @@ void function_step_when_compiles_and_builds() { @Test void emit_step_exportAs_javaFilter_sets_export() { - // Build an emit spec using your DSL (type + data function) - FuncEmitSpec spec = - new FuncEmitSpec() - .type("org.acme.signal") - .bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); - // JavaFilterFunction is (T, WorkflowContextData, TaskContextData) -> R JavaFilterFunction> jf = (val, wfCtx, taskCtx) -> Map.of("wrapped", val, "wfId", wfCtx.instanceData().id()); Workflow wf = FuncWorkflowBuilder.workflow("step-emit-export") - .tasks(emit("emitWrapped", spec).exportAs(jf)) // chaining on Step + .tasks( + emit("emitWrapped", produced("org.acme.signal").bytesDataUtf8()) + .exportAs(jf)) // chaining on Step .build(); List items = wf.getDo(); @@ -123,11 +115,11 @@ void emit_step_exportAs_javaFilter_sets_export() { @Test @DisplayName("listen(spec).exportAs(Function) sets Export on ListenTask holder") void listen_step_exportAs_function_sets_export() { - FuncListenSpec spec = toOne("org.acme.review.done"); // using your existing DSL helper - Workflow wf = FuncWorkflowBuilder.workflow("step-listen-export") - .tasks(listen("waitHumanReview", spec).exportAs((Object e) -> Map.of("seen", true))) + .tasks( + listen("waitHumanReview", toOne("org.acme.review.done")) + .exportAs((Object e) -> Map.of("seen", true))) .build(); List items = wf.getDo(); @@ -143,13 +135,11 @@ void listen_step_exportAs_function_sets_export() { } @Test - @DisplayName("emit(event(type, fn)).when(...) -> still an EmitTask and builds") + @DisplayName("emit(consumed(type, fn)).when(...) -> still an EmitTask and builds") void emit_step_when_compiles_and_builds() { Workflow wf = FuncWorkflowBuilder.workflow("step-emit-when") - .tasks( - emit(event("org.acme.sig", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) - .when((Object ctx) -> true)) + .tasks(emit(produced("org.acme.sig").bytesDataUtf8()).when((Object ctx) -> true)) .build(); List items = wf.getDo(); @@ -164,9 +154,7 @@ void mixed_chaining_order_and_exports() { FuncWorkflowBuilder.workflow("step-mixed") .tasks( function(String::strip, String.class).exportAs((String s) -> Map.of("s", s)), - emit(event( - "org.acme.kickoff", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) - .when((Object ignore) -> true), + emit(produced("org.acme.kickoff").bytesDataUtf8()).when((Object ignore) -> true), listen(toOne("org.acme.done")).exportAs((Object e) -> Map.of("ok", true))) .build(); @@ -190,7 +178,7 @@ void mixed_chaining_order_and_exports() { void switchWhenOrElse_jq_to_taskName() { Workflow wf = FuncWorkflowBuilder.workflow("jqSwitch") - .tasks(FuncDSL.switchWhenOrElse(".approved", "send", "draft")) + .tasks(switchWhenOrElse(".approved", "send", "draft")) .build(); Task switchTask = wf.getDo().get(0).getTask(); assertNotNull(switchTask.getSwitchTask()); @@ -203,7 +191,7 @@ void switchWhenOrElse_jq_to_taskName() { void switchWhenOrElse_jq_to_directive() { Workflow wf = FuncWorkflowBuilder.workflow("jqSwitchDir") - .tasks(FuncDSL.switchWhenOrElse(".score >= 80", "pass", FlowDirectiveEnum.END)) + .tasks(switchWhenOrElse(".score >= 80", "pass", FlowDirectiveEnum.END)) .build(); Task switchTask = wf.getDo().get(0).getTask(); var items = switchTask.getSwitchTask().getSwitch(); diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java index fe7a9b07d..f54eb25d2 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java @@ -17,8 +17,11 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.agent; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withUniqueId; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; @@ -104,7 +107,7 @@ void withUniqueId_uses_json_pointer_for_unique_id() throws Exception { @Test @DisplayName("agent(fn, in) composes uniqueId = instanceId-jsonPointer and passes it") - void agent_uses_json_pointer_for_unique_id() throws Exception { + void agent_uses_json_pointer_for_unique_id() { AtomicReference receivedUniqueId = new AtomicReference<>(); AtomicReference receivedPayload = new AtomicReference<>(); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java similarity index 90% rename from fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java rename to fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java index 3cb365f99..4e24c0ea6 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java @@ -19,4 +19,4 @@ import java.util.function.Consumer; @FunctionalInterface -public interface EventConfigurer extends Consumer {} +public interface EventPropertiesConfigurer extends Consumer {} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java new file mode 100644 index 000000000..67a191e25 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public abstract class AbstractEventFilterSpec< + SELF, + EVENT_PROPS extends AbstractEventPropertiesBuilder, + EVENT_FILTER extends AbstractEventFilterBuilder> + extends ExprEventPropertiesSpec implements Consumer { + + private final List> filterSteps = new ArrayList<>(); + + protected AbstractEventFilterSpec() {} + + protected abstract SELF self(); + + protected void addFilterStep(Consumer step) { + filterSteps.add(step); + } + + protected List> getFilterSteps() { + return filterSteps; + } + + public SELF correlate(String key, Consumer c) { + filterSteps.add(f -> f.correlate(key, c)); + return self(); + } + + @Override + public void accept(EVENT_FILTER filterBuilder) { + filterBuilder.with( + pb -> { + getPropertySteps().forEach(step -> step.accept(pb)); + }); + + filterSteps.forEach(step -> step.accept(filterBuilder)); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java index fbf45545a..d32efe396 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java @@ -21,52 +21,43 @@ /** * Generic base for Listen specs. * - *

Type params: SELF - fluent self type (the concrete spec) LB - ListenTaskBuilder type (e.g., - * ListenTaskBuilder, AgentListenTaskBuilder, FuncListenTaskBuilder) TB - ListenToBuilder type - * (e.g., ListenToBuilder, FuncListenToBuilder) FB - EventFilterBuilder type (e.g., - * EventFilterBuilder, FuncEventFilterBuilder) EC - Event configurer type (e.g., EventConfigurer, - * FuncPredicateEventConfigurer) + *

Type params: SELF - fluent self type (the concrete spec) LISTEN_TASK - {@link + * io.serverlessworkflow.fluent.spec.ListenTaskBuilder} type LISTEN_TO - {@link + * io.serverlessworkflow.fluent.spec.ListenToBuilder} type EVENT_FILTER - {@link + * io.serverlessworkflow.fluent.spec.EventFilterBuilder} type */ -public abstract class BaseListenSpec { +public abstract class BaseListenSpec { @FunctionalInterface - public interface ToInvoker { - void to(LB listenTaskBuilder, Consumer toStep); + public interface ToInvoker { + void to(LISTEN_TASK listenTaskBuilder, Consumer toStep); } @FunctionalInterface - public interface WithApplier { - void with(FB filterBuilder, EC eventConfigurer); + public interface FiltersApplier { + void apply(LISTEN_TO toBuilder, @SuppressWarnings("rawtypes") Consumer[] filters); } @FunctionalInterface - public interface FiltersApplier { - void apply(TB toBuilder, @SuppressWarnings("rawtypes") Consumer[] filters); + public interface OneFilterApplier { + void apply(LISTEN_TO toBuilder, Consumer filter); } - @FunctionalInterface - public interface OneFilterApplier { - void apply(TB toBuilder, Consumer filter); - } - - private final ToInvoker toInvoker; - private final WithApplier withApplier; - private final FiltersApplier allApplier; - private final FiltersApplier anyApplier; - private final OneFilterApplier oneApplier; + private final ToInvoker toInvoker; + private final FiltersApplier allApplier; + private final FiltersApplier anyApplier; + private final OneFilterApplier oneApplier; - private Consumer strategyStep; - private Consumer untilStep; + private Consumer strategyStep; + private Consumer untilStep; protected BaseListenSpec( - ToInvoker toInvoker, - WithApplier withApplier, - FiltersApplier allApplier, - FiltersApplier anyApplier, - OneFilterApplier oneApplier) { + ToInvoker toInvoker, + FiltersApplier allApplier, + FiltersApplier anyApplier, + OneFilterApplier oneApplier) { this.toInvoker = Objects.requireNonNull(toInvoker, "toInvoker"); - this.withApplier = Objects.requireNonNull(withApplier, "withApplier"); this.allApplier = Objects.requireNonNull(allApplier, "allApplier"); this.anyApplier = Objects.requireNonNull(anyApplier, "anyApplier"); this.oneApplier = Objects.requireNonNull(oneApplier, "oneApplier"); @@ -74,42 +65,31 @@ protected BaseListenSpec( protected abstract SELF self(); - protected final void setUntilStep(Consumer untilStep) { + protected final void setUntilStep(Consumer untilStep) { this.untilStep = untilStep; } - /** Convert EC[] -> Consumer[] that call `filterBuilder.with(event)` */ - @SuppressWarnings("unchecked") - protected final Consumer[] asFilters(EC... events) { - Objects.requireNonNull(events, "events"); - Consumer[] filters = new Consumer[events.length]; - for (int i = 0; i < events.length; i++) { - EC ev = Objects.requireNonNull(events[i], "events[" + i + "]"); - filters[i] = fb -> withApplier.with(fb, ev); - } - return filters; - } - @SafeVarargs - public final SELF all(EC... events) { - strategyStep = t -> allApplier.apply(t, asFilters(events)); + public final SELF all(Consumer... filters) { + Objects.requireNonNull(filters, "filters"); + strategyStep = t -> allApplier.apply(t, filters); return self(); } @SafeVarargs - public final SELF any(EC... events) { - strategyStep = t -> anyApplier.apply(t, asFilters(events)); + public final SELF any(Consumer... filters) { + Objects.requireNonNull(filters, "filters"); + strategyStep = t -> anyApplier.apply(t, filters); return self(); } - public final SELF one(EC event) { - Objects.requireNonNull(event, "event"); - strategyStep = t -> oneApplier.apply(t, fb -> withApplier.with(fb, event)); + public final SELF one(Consumer filter) { + Objects.requireNonNull(filter, "filter"); + strategyStep = t -> oneApplier.apply(t, filter); return self(); } - /** Concrete 'accept' should delegate here with its concrete LB. */ - protected final void acceptInto(LB listenTaskBuilder) { + protected final void acceptInto(LISTEN_TASK listenTaskBuilder) { Objects.requireNonNull(strategyStep, "listening strategy must be set (all/any/one)"); toInvoker.to( listenTaskBuilder, diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java index 52a6f07e6..4deb8293e 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java @@ -60,8 +60,7 @@ private DSL() {} // ---- Convenient shortcuts ----// /** - * Create a new HTTP call specification to be used with {@link #call(CallHttpConfigurer)} or - * {@link #call(io.serverlessworkflow.fluent.func.dsl.FuncCallHttpSpec)}. + * Create a new HTTP call specification to be used with {@link #call(CallHttpConfigurer)} * *

Typical usage: * @@ -203,13 +202,13 @@ public static ListenSpec to() { /** * Start building an event emission specification. * - *

Use methods on {@link EventSpec} to define event type and payload, and pass it to {@link - * #emit(Consumer)}. + *

Use methods on {@link EventFilterSpec} to define event type and payload, and pass it to + * {@link #emit(Consumer)}. * - * @return a new {@link EventSpec} + * @return a new {@link EventFilterSpec} */ - public static EventSpec event() { - return new EventSpec(); + public static EventFilterSpec event() { + return new EventFilterSpec(); } /** @@ -644,15 +643,65 @@ public static TasksConfigurer set(String expr) { } /** - * Create a {@link TasksConfigurer} that adds an {@code emit} task. + * Adds an {@code emit} task to the workflow's task sequence using a custom configurer. * * - * @param configurer consumer configuring {@link EmitTaskBuilder} - * @return a {@link TasksConfigurer} that adds an EmitTask + *

This method is typically used in conjunction with {@link #produced()} to fluently define the + * properties of the event being emitted. * + * + *

Example usage: + * + *

{@code
+   * emit(produced().type("my.custom.event").source("my/source"))
+   * }
+ * + * @param configurer a consumer, such as an {@link EmitSpec}, that configures the {@link + * EmitTaskBuilder} + * @return a {@link TasksConfigurer} to continue building the task list */ public static TasksConfigurer emit(Consumer configurer) { return list -> list.emit(configurer); } + /** + * @see #emit(Consumer) + */ + public static TasksConfigurer emit(String name, Consumer configurer) { + return list -> list.emit(name, configurer); + } + + /** + * A convenient shortcut to add an {@code emit} task that only requires a CloudEvent type. * + * + *

Use this method when you only need to specify the {@code type} attribute of the emitted + * event and do not need to configure additional properties like data or source. + * + * @param cloudEventType the {@code type} attribute of the CloudEvent to be emitted + * @return a {@link TasksConfigurer} to continue building the task list + */ + public static TasksConfigurer emit(String cloudEventType) { + return list -> list.emit(new EmitSpec().type(cloudEventType)); + } + + /** + * Starts building an event emission specification. * + * + *

This creates a new {@link EmitSpec} which acts as a fluent builder for the properties (e.g., + * type, source, data) of the CloudEvent to be emitted. The resulting spec can then be passed + * directly to {@link #emit(Consumer)}. + * + * @return a new {@link EmitSpec} instance for fluent configuration + */ + public static EmitSpec produced() { + return new EmitSpec(); + } + + /** + * @see #produced() + */ + public static EmitSpec produced(String cloudEventType) { + return new EmitSpec().type(cloudEventType); + } + /** * Create a {@link TasksConfigurer} that adds a {@code listen} task. * diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java index f07dd77c6..d7d24d0d6 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java @@ -16,9 +16,11 @@ package io.serverlessworkflow.fluent.spec.dsl; import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; +import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; import io.serverlessworkflow.fluent.spec.configurers.EmitConfigurer; -public final class EmitSpec extends ExprEventFilterSpec implements EmitConfigurer { +public final class EmitSpec extends ExprEventPropertiesSpec + implements EmitConfigurer { @Override protected EmitSpec self() { @@ -27,6 +29,6 @@ protected EmitSpec self() { @Override public void accept(EmitTaskBuilder emitTaskBuilder) { - emitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); + emitTaskBuilder.event(e -> getPropertySteps().forEach(step -> step.accept(e))); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java index 873883694..2d61f830f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java @@ -15,72 +15,14 @@ */ package io.serverlessworkflow.fluent.spec.dsl; -import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; -import java.net.URI; -import java.time.Instant; -import java.util.Date; -import java.util.List; -import java.util.UUID; -import java.util.function.Consumer; +import io.serverlessworkflow.fluent.spec.EventFilterBuilder; +import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -public abstract class EventFilterSpec> { +public final class EventFilterSpec + extends AbstractEventFilterSpec { - private final List> steps; - - protected EventFilterSpec(List> steps) { - this.steps = steps; - } - - protected abstract SELF self(); - - protected void addStep(Consumer step) { - steps.add(step); - } - - protected List> getSteps() { - return steps; - } - - public SELF type(String eventType) { - steps.add(e -> e.type(eventType)); - return self(); - } - - /** Sets the CloudEvent id to a random UUID */ - public SELF randomId() { - steps.add(e -> e.id(UUID.randomUUID().toString())); - return self(); - } - - /** Sets the CloudEvent time to the current system time */ - public SELF now() { - steps.add(e -> e.time(Date.from(Instant.now()))); - return self(); - } - - public SELF contentType(String ct) { - steps.add(e -> e.dataContentType(ct)); - return self(); - } - - /** Sets the CloudEvent dataContentType to `application/json` */ - public SELF JSON() { - steps.add(e -> e.dataContentType("application/json")); - return self(); - } - - public SELF OCTET_STREAM() { - steps.add(e -> e.dataContentType("application/octet-stream")); - return self(); - } - - public SELF source(String source) { - steps.add(e -> e.source(source)); - return self(); - } - - public SELF source(URI source) { - steps.add(e -> e.source(source)); - return self(); + @Override + protected EventFilterSpec self() { + return this; } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java new file mode 100644 index 000000000..9ad0fd4da --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.net.URI; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.function.Consumer; + +public abstract class EventPropertiesSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> { + + private final List> propertySteps = new ArrayList<>(); + + protected abstract SELF self(); + + protected void addPropertyStep(Consumer step) { + propertySteps.add(step); + } + + protected List> getPropertySteps() { + return propertySteps; + } + + public SELF type(String eventType) { + propertySteps.add(e -> e.type(eventType)); + return self(); + } + + /** Sets the CloudEvent id to a random UUID */ + public SELF randomId() { + propertySteps.add(e -> e.id(UUID.randomUUID().toString())); + return self(); + } + + /** Sets the CloudEvent time to the current system time */ + public SELF now() { + propertySteps.add(e -> e.time(Date.from(Instant.now()))); + return self(); + } + + public SELF contentType(String ct) { + propertySteps.add(e -> e.dataContentType(ct)); + return self(); + } + + /** Sets the CloudEvent dataContentType to `application/json` */ + public SELF JSON() { + propertySteps.add(e -> e.dataContentType("application/json")); + return self(); + } + + public SELF OCTET_STREAM() { + propertySteps.add(e -> e.dataContentType("application/octet-stream")); + return self(); + } + + public SELF source(String source) { + propertySteps.add(e -> e.source(source)); + return self(); + } + + public SELF source(URI source) { + propertySteps.add(e -> e.source(source)); + return self(); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java deleted file mode 100644 index 3e93c9d2a..000000000 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.spec.dsl; - -import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; - -public final class EventSpec extends ExprEventFilterSpec implements EventConfigurer { - - @Override - protected EventSpec self() { - return this; - } - - @Override - public void accept(EventPropertiesBuilder eventPropertiesBuilder) { - getSteps().forEach(step -> step.accept(eventPropertiesBuilder)); - } -} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventPropertiesSpec.java similarity index 75% rename from fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java rename to fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventPropertiesSpec.java index 4d4850385..dec1a9aed 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventPropertiesSpec.java @@ -15,26 +15,22 @@ */ package io.serverlessworkflow.fluent.spec.dsl; -import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -import java.util.ArrayList; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.util.Map; -public abstract class ExprEventFilterSpec - extends EventFilterSpec { - - ExprEventFilterSpec() { - super(new ArrayList<>()); - } +public abstract class ExprEventPropertiesSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> + extends EventPropertiesSpec { /** Sets the event data and the contentType to `application/json` */ public SELF jsonData(String expr) { - addStep(e -> e.data(expr)); + addPropertyStep(e -> e.data(expr)); return JSON(); } /** Sets the event data and the contentType to `application/json` */ public SELF jsonData(Map data) { - addStep(e -> e.data(data)); + addPropertyStep(e -> e.data(data)); return JSON(); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java index 56d7a38fb..b26702b2f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java @@ -16,26 +16,22 @@ package io.serverlessworkflow.fluent.spec.dsl; import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; -import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import io.serverlessworkflow.fluent.spec.EventFilterBuilder; import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; import io.serverlessworkflow.fluent.spec.ListenToBuilder; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; +import io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer; import java.util.Objects; import java.util.function.Consumer; public final class ListenSpec - extends BaseListenSpec< - ListenSpec, ListenTaskBuilder, ListenToBuilder, EventFilterBuilder, EventConfigurer> - implements io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer { + extends BaseListenSpec + implements ListenConfigurer { public ListenSpec() { super( // toInvoker AbstractListenTaskBuilder::to, - // withApplier - AbstractEventFilterBuilder::with, // allApplier (tb, filters) -> tb.all(castFilters(filters)), // anyApplier diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java index d62fd22af..d479ebcce 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java @@ -17,8 +17,9 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.basic; import static io.serverlessworkflow.fluent.spec.dsl.DSL.cases; -import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.emit; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.produced; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -28,8 +29,11 @@ import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.CatchErrors; import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.api.types.EmitEventDefinition; +import io.serverlessworkflow.api.types.EmitTask; import io.serverlessworkflow.api.types.ErrorFilter; import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.HTTPArguments; import io.serverlessworkflow.api.types.HTTPHeaders; @@ -209,23 +213,17 @@ void testDoTaskEmitEvent() { Workflow wf = WorkflowBuilder.workflow("flowEmit") .tasks( - d -> - d.emit( - "emitEvent", - e -> - e.event( - event() - .type("com.petstore.order.placed.v1") - .source(URI.create("https://petstore.com")) - .jsonData( - Map.of( - "client", - Map.of( - "firstName", "Cruella", "lastName", "de Vil"), - "items", - List.of( - Map.of( - "breed", "dalmatian", "quantity", 101))))))) + emit( + "emitEvent", + produced() + .type("com.petstore.order.placed.v1") + .source(URI.create("https://petstore.com")) + .jsonData( + Map.of( + "client", + Map.of("firstName", "Cruella", "lastName", "de Vil"), + "items", + List.of(Map.of("breed", "dalmatian", "quantity", 101)))))) .build(); List items = wf.getDo(); @@ -234,12 +232,12 @@ void testDoTaskEmitEvent() { TaskItem item = items.get(0); assertEquals("emitEvent", item.getName(), "TaskItem name should match"); - io.serverlessworkflow.api.types.EmitTask et = item.getTask().getEmitTask(); + EmitTask et = item.getTask().getEmitTask(); assertNotNull(et, "EmitTask should be present"); - io.serverlessworkflow.api.types.EmitEventDefinition ed = et.getEmit().getEvent(); + EmitEventDefinition ed = et.getEmit().getEvent(); assertNotNull(ed, "EmitEventDefinition should be present"); - io.serverlessworkflow.api.types.EventProperties props = ed.getWith(); + EventProperties props = ed.getWith(); assertEquals( "https://petstore.com", props.getSource().getUriTemplate().getLiteralUri().toString(), diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java index 80b9d1e1b..41304b1ba 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java @@ -22,6 +22,7 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.error; import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.produced; import static io.serverlessworkflow.fluent.spec.dsl.DSL.secrets; import static io.serverlessworkflow.fluent.spec.dsl.DSL.to; import static org.assertj.core.api.Assertions.assertThat; @@ -68,7 +69,7 @@ public void when_listen_all_then_emit() { to().all( event().type("org.acme.listen"), event().type("org.example.listen"))) - .emit(e -> e.event(event().type("org.example.emit")))) + .emit(produced("org.example.emit"))) .build(); // Sanity diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java index bd2cb3c05..afb4ccd29 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java @@ -17,7 +17,6 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.call; import static io.serverlessworkflow.fluent.spec.dsl.DSL.emit; -import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; import static io.serverlessworkflow.fluent.spec.dsl.DSL.set; import static io.serverlessworkflow.fluent.spec.dsl.DSL.tryCatch; @@ -47,7 +46,7 @@ void when_try_with_tasks_and_catch_when_with_retry_and_tasks() { .catches() .when("$.error == true") .errors(Errors.RUNTIME, 500) - .tasks(emit(e -> e.event(event().type("org.acme.failed")))) + .tasks(emit("org.acme.failed")) .retry() .when("$.retries < 3") .limit("PT5S") @@ -122,7 +121,7 @@ void when_try_with_multiple_tasks_and_catch_except_when_with_uri_error_filter() .catches() .exceptWhen("$.code == 502") .errors(errType, 502) - .tasks(emit(e -> e.event(event().type("org.acme.recover")))) + .tasks(emit(("org.acme.recover"))) .done() // back to TrySpec )) .build(); @@ -175,7 +174,7 @@ void when_try_with_catch_and_simple_retry_limit_only() { .retry() .limit("PT2S") .done() - .tasks(emit(e -> e.event(event().type("org.acme.retrying")))) + .tasks(emit(("org.acme.retrying"))) .done())) .build();