diff --git a/pom.xml b/pom.xml index 78a5f34..78586a6 100755 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,7 @@ under the License. jar - 3.0.5 + 4.0.0-M120 1.3.2 2.10 @@ -49,6 +49,17 @@ under the License. + + org.wso2.siddhi + siddhi-query-api + ${siddhi.version} + + + org.apache.directory.jdbm + apacheds-jdbm1 + + + @@ -143,7 +154,6 @@ under the License. - diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java index 0cc6aba..1fafa77 100755 --- a/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java @@ -48,7 +48,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.core.stream.input.InputHandler; import org.wso2.siddhi.query.api.definition.AbstractDefinition; @@ -60,7 +60,7 @@ * *
    *
  • - * Create Siddhi {@link ExecutionPlanRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle. + * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle. *
  • *
  • * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId @@ -96,7 +96,7 @@ public abstract class AbstractSiddhiOperator extends AbstractStreamOper private final Map> streamRecordSerializers; private transient SiddhiManager siddhiManager; - private transient ExecutionPlanRuntime siddhiRuntime; + private transient SiddhiAppRuntime siddhiRuntime; private transient Map inputStreamHandlers; // queue to buffer out of order stream records @@ -179,7 +179,7 @@ public PriorityQueue> getPriorityQueue() { return priorityQueue; } - protected ExecutionPlanRuntime getSiddhiRuntime() { + protected SiddhiAppRuntime getSiddhiRuntime() { return this.siddhiRuntime; } @@ -213,7 +213,7 @@ protected void send(String streamId, Object[] data, long timestamp) throws Inter private static void validate(final SiddhiOperatorContext siddhiPlan) { SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager(); try { - siddhiManager.validateExecutionPlan(siddhiPlan.getFinalExecutionPlan()); + siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan()); } finally { siddhiManager.shutdown(); } @@ -228,7 +228,7 @@ private void startSiddhiRuntime() { for (Map.Entry> entry : this.siddhiPlan.getExtensions().entrySet()) { this.siddhiManager.setExtension(entry.getKey(), entry.getValue()); } - this.siddhiRuntime = siddhiManager.createExecutionPlanRuntime(executionExpression); + this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression); this.siddhiRuntime.start(); registerInputAndOutput(this.siddhiRuntime); LOGGER.info("Siddhi {} started", siddhiRuntime.getName()); @@ -252,7 +252,7 @@ private void shutdownSiddhiRuntime() { } @SuppressWarnings("unchecked") - private void registerInputAndOutput(ExecutionPlanRuntime runtime) { + private void registerInputAndOutput(SiddhiAppRuntime runtime) { AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId()); runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output)); inputStreamHandlers = new HashMap<>(); diff --git a/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java index 0a2d3f0..c12611a 100644 --- a/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java +++ b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java @@ -23,10 +23,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.query.api.definition.AbstractDefinition; import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; import java.util.ArrayList; import java.util.HashMap; @@ -64,11 +65,11 @@ public static void registerType(Class javaType, Attribute.Type siddhiType) { public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) { SiddhiManager siddhiManager = null; - ExecutionPlanRuntime runtime = null; + SiddhiAppRuntime runtime = null; try { siddhiManager = new SiddhiManager(); - runtime = siddhiManager.createExecutionPlanRuntime(executionPlan); - Map definitionMap = runtime.getStreamDefinitionMap(); + runtime = siddhiManager.createSiddhiAppRuntime(executionPlan); + Map definitionMap = runtime.getStreamDefinitionMap(); if (definitionMap.containsKey(streamId)) { return definitionMap.get(streamId); } else { diff --git a/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java b/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java index 6e2746e..fece1b7 100644 --- a/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java +++ b/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java @@ -17,31 +17,31 @@ package org.apache.flink.contrib.siddhi.extension; -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import java.util.HashMap; +import java.util.Map; + +import org.wso2.siddhi.core.config.SiddhiAppContext; +import org.wso2.siddhi.core.exception.SiddhiAppCreationException; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.core.util.config.ConfigReader; import org.wso2.siddhi.query.api.definition.Attribute; public class CustomPlusFunctionExtension extends FunctionExecutor { - private Attribute.Type returnType; /** * The initialization method for FunctionExecutor, this method will be called before the other methods - * - * @param attributeExpressionExecutors are the executors of each function parameters - * @param executionPlanContext the context of the execution plan */ @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) { for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) { Attribute.Type attributeType = expressionExecutor.getReturnType(); if (attributeType == Attribute.Type.DOUBLE) { returnType = attributeType; } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { - throw new ExecutionPlanCreationException("Plus cannot have parameters with types String or Bool"); + throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool"); } else { returnType = Attribute.Type.LONG; } @@ -90,53 +90,18 @@ protected Object execute(Object data) { } } - /** - * This will be called only once and this can be used to acquire - * required resources for the processing element. - * This will be called after initializing the system and before - * starting to process the events. - */ - @Override - public void start() { - - } - - /** - * This will be called only once and this can be used to release - * the acquired resources for processing. - * This will be called before shutting down the system. - */ - @Override - public void stop() { - - } - @Override public Attribute.Type getReturnType() { return returnType; } - /** - * Used to collect the serializable state of the processing element, that need to be - * persisted for the reconstructing the element to the same state on a different point of time - * - * @return stateful objects of the processing element as an array - */ @Override - public Object[] currentState() { - return new Object[0]; + public Map currentState() { + return new HashMap<>(); } - /** - * Used to restore serialized state of the processing element, for reconstructing - * the element to the same state as if was on a previous point of time. - * - * @param state the stateful objects of the element as an array on - * the same order provided by currentState(). - */ @Override - public void restoreState(Object[] state) { + public void restoreState(Map map) { } - } diff --git a/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java b/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java index d70107d..e95bf36 100644 --- a/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java +++ b/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java @@ -21,7 +21,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.stream.input.InputHandler; @@ -46,7 +46,7 @@ public void after() { @Test public void testSimplePlan() throws InterruptedException { - ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime( + SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime( "define stream inStream (name string, value double);" + "from inStream insert into outStream"); runtime.start();