Skip to content

Commit

Permalink
Upgrade siddhi to 4.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
haoch committed Nov 18, 2017
1 parent 2847eb8 commit cec22bc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 61 deletions.
14 changes: 12 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<siddhi.version>3.0.5</siddhi.version>
<siddhi.version>4.0.0-M120</siddhi.version>
<flink.version>1.3.2</flink.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
Expand All @@ -49,6 +49,17 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>${siddhi.version}</version>
<exclusions>
<exclusion> <!-- declare the exclusion here -->
<groupId>org.apache.directory.jdbm</groupId>
<artifactId>apacheds-jdbm1</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Core streaming API -->
<dependency>
Expand Down Expand Up @@ -143,7 +154,6 @@ under the License.
</pluginRepository>
</pluginRepositories>


<build>
<pluginManagement>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
Expand All @@ -60,7 +60,7 @@
*
* <ul>
* <li>
* Create Siddhi {@link ExecutionPlanRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
* Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
* </li>
* <li>
* Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
Expand Down Expand Up @@ -96,7 +96,7 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;

private transient SiddhiManager siddhiManager;
private transient ExecutionPlanRuntime siddhiRuntime;
private transient SiddhiAppRuntime siddhiRuntime;
private transient Map<String, InputHandler> inputStreamHandlers;

// queue to buffer out of order stream records
Expand Down Expand Up @@ -179,7 +179,7 @@ public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
return priorityQueue;
}

protected ExecutionPlanRuntime getSiddhiRuntime() {
protected SiddhiAppRuntime getSiddhiRuntime() {
return this.siddhiRuntime;
}

Expand Down Expand Up @@ -213,7 +213,7 @@ protected void send(String streamId, Object[] data, long timestamp) throws Inter
private static void validate(final SiddhiOperatorContext siddhiPlan) {
SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
try {
siddhiManager.validateExecutionPlan(siddhiPlan.getFinalExecutionPlan());
siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
} finally {
siddhiManager.shutdown();
}
Expand All @@ -228,7 +228,7 @@ private void startSiddhiRuntime() {
for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
}
this.siddhiRuntime = siddhiManager.createExecutionPlanRuntime(executionExpression);
this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
this.siddhiRuntime.start();
registerInputAndOutput(this.siddhiRuntime);
LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
Expand All @@ -252,7 +252,7 @@ private void shutdownSiddhiRuntime() {
}

@SuppressWarnings("unchecked")
private void registerInputAndOutput(ExecutionPlanRuntime runtime) {
private void registerInputAndOutput(SiddhiAppRuntime runtime) {
AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output));
inputStreamHandlers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -64,11 +65,11 @@ public static void registerType(Class<?> javaType, Attribute.Type siddhiType) {

public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) {
SiddhiManager siddhiManager = null;
ExecutionPlanRuntime runtime = null;
SiddhiAppRuntime runtime = null;
try {
siddhiManager = new SiddhiManager();
runtime = siddhiManager.createExecutionPlanRuntime(executionPlan);
Map<String, AbstractDefinition> definitionMap = runtime.getStreamDefinitionMap();
runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
if (definitionMap.containsKey(streamId)) {
return definitionMap.get(streamId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@

package org.apache.flink.contrib.siddhi.extension;

import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import java.util.HashMap;
import java.util.Map;

import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.Attribute;

public class CustomPlusFunctionExtension extends FunctionExecutor {

private Attribute.Type returnType;

/**
* The initialization method for FunctionExecutor, this method will be called before the other methods
*
* @param attributeExpressionExecutors are the executors of each function parameters
* @param executionPlanContext the context of the execution plan
*/
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
Attribute.Type attributeType = expressionExecutor.getReturnType();
if (attributeType == Attribute.Type.DOUBLE) {
returnType = attributeType;

} else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) {
throw new ExecutionPlanCreationException("Plus cannot have parameters with types String or Bool");
throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool");
} else {
returnType = Attribute.Type.LONG;
}
Expand Down Expand Up @@ -90,53 +90,18 @@ protected Object execute(Object data) {
}
}

/**
* This will be called only once and this can be used to acquire
* required resources for the processing element.
* This will be called after initializing the system and before
* starting to process the events.
*/
@Override
public void start() {

}

/**
* This will be called only once and this can be used to release
* the acquired resources for processing.
* This will be called before shutting down the system.
*/
@Override
public void stop() {

}

@Override
public Attribute.Type getReturnType() {
return returnType;
}

/**
* Used to collect the serializable state of the processing element, that need to be
* persisted for the reconstructing the element to the same state on a different point of time
*
* @return stateful objects of the processing element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
public Map<String, Object> currentState() {
return new HashMap<>();
}

/**
* Used to restore serialized state of the processing element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
public void restoreState(Map<String, Object> map) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
Expand All @@ -46,7 +46,7 @@ public void after() {

@Test
public void testSimplePlan() throws InterruptedException {
ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
"define stream inStream (name string, value double);"
+ "from inStream insert into outStream");
runtime.start();
Expand Down

0 comments on commit cec22bc

Please sign in to comment.