diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9985ffb --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +.cache +scalastyle-output.xml +.classpath +.idea +.metadata +.settings +.project +.version.properties +filter.properties +logs.zip +target +tmp +*.class +*.iml +*.swp +*.jar +*.log +.DS_Store +build-target +flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ +flink-runtime-web/web-dashboard/assets/fonts/ +flink-runtime-web/web-dashboard/node_modules/ +flink-runtime-web/web-dashboard/bower_components/ +atlassian-ide-plugin.xml +out/ +/docs/api +/docs/content +/docs/Gemfile.lock +/docs/.bundle +/docs/.rubydeps diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9c8f3ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba195bb --- /dev/null +++ b/README.md @@ -0,0 +1,88 @@ + +flink-siddhi +============ + +> A light-weight library to run siddhi cep within flink streaming application. + +__Version:`1.2-SNAPSHOT`__ + +## About + +Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. +Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. + +This project is mainly to provide a light-weight library to easily run Siddhi CEP within flink streaming application. + +## How to Use + +* Add `flink-siddhi` in maven dependency: + + + com.github.haoch + flink-siddhi + 1.2-SNAPSHOT + + +* Start using API from `SiddhiCEP`, for example: + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + + cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); + + cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); + cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); + + DataStream> output = cep + .from("inputStream1").union("inputStream2") + .sql( + "from every s1 = inputStream1[id == 2] " + + " -> s2 = inputStream2[id == 3] " + + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + + "insert into outputStream" + ) + .returns("outputStream"); + + env.execute(); + + > See more examples at `org.apache.flink.contrib.siddhi.SiddhiCEPITCase` + +## How to Build + + mvn clean install -DskipTests + +## How to Test + + mvn clean test + +## Main Features + +* Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like + * Filter + * Join + * Aggregation + * Group by + * Having + * Window + * Conditions and Expressions + * Pattern processing + * Sequence processing + * Event Tables + * ... + +* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) + * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. + * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan + * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema + +* Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) + +* Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) + +## Get Help and Contact + +* [@haoch](http://github.com/haoch) (hao AT apache DOT org) + +## License + +Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0). More details, please refer to [LICENSE](LICENSE) file. \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..391568f --- /dev/null +++ b/pom.xml @@ -0,0 +1,133 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-parent + 1.2-SNAPSHOT + + + com.github.haoch + flink-siddhi_2.10 + flink-siddhi + + jar + + + 3.0.5 + + + + central + + Maven Repository + https://repo1.maven.org/maven2 + + true + + + false + + + + snapshot + Maven Snapshot Repository + https://repository.apache.org/snapshots + + true + + + false + + + + Wso2 Repository + http://maven.wso2.org/nexus/content/groups/wso2-public + + true + + + false + + + + + + central + https://repo1.maven.org/maven2 + + true + + + false + + + + + + + + org.wso2.siddhi + siddhi-core + ${siddhi.version} + + + + + com.esotericsoftware.kryo + kryo + provided + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + provided + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test + + + org.apache.flink + flink-scala_2.10 + ${project.version} + + + diff --git a/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java b/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java new file mode 100644 index 0000000..14ee580 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map> dataStreams = new HashMap<>(); + private final Map> dataStreamSchemas = new HashMap<>(); + private final Map> extensions = new HashMap<>(); + + public Map> getDataStreams() { + return this.dataStreams; + } + + public Map> getDataStreamSchemas() { + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId) { + return dataStreams.containsKey(streamId); + } + + public Map> getExtensions() { + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if (!isStreamDefined(streamId)) { + throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId, inStream, fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames) { + this.registerStream(streamId, inStream, fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId) { + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId, String... unionStreamIds) { + return new SiddhiStream.SingleSiddhiStream(firstStreamId, this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public void registerExtension(String extensionName, Class extensionClass) { + if (extensions.containsKey(extensionName)) { + throw new IllegalArgumentException("Extension named " + extensionName + " already registered"); + } + extensions.put(extensionName, extensionClass); + } + + public DataStream getDataStream(String streamId) { + if (this.dataStreams.containsKey(streamId)) { + return (DataStream) this.dataStreams.get(streamId); + } else { + throw new UndefinedStreamException("Undefined stream " + streamId); + } + } + + public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) { + return new SiddhiCEP(streamExecutionEnvironment); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java b/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java new file mode 100644 index 0000000..f10cf1b --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory; +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Siddhi CEP API Interface + */ +@PublicEvolving +public abstract class SiddhiStream { + private final SiddhiCEP environment; + + public SiddhiStream(SiddhiCEP environment) { + this.environment = environment; + } + + protected SiddhiCEP getEnvironment() { + return this.environment; + } + + protected abstract DataStream> toDataStream(); + + /** + * Convert DataStream<T> to DataStream<Tuple2<String,T>>. + * If it's KeyedStream. pass through original keySelector + */ + protected DataStream> convertDataStream(DataStream dataStream, String streamId) { + final String streamIdInClosure = streamId; + DataStream> resultStream = dataStream.map(new MapFunction>() { + @Override + public Tuple2 map(T value) throws Exception { + return Tuple2.of(streamIdInClosure, (Object) value); + } + }); + if (dataStream instanceof KeyedStream) { + final KeySelector keySelector = ((KeyedStream) dataStream).getKeySelector(); + final KeySelector, Object> keySelectorInClosure = new KeySelector, Object>() { + @Override + public Object getKey(Tuple2 value) throws Exception { + return keySelector.getKey((T) value.f1); + } + }; + return resultStream.keyBy(keySelectorInClosure); + } else { + return resultStream; + } + } + + public static abstract class ExecutableStream extends SiddhiStream { + public ExecutableStream(SiddhiCEP environment) { + super(environment); + } + + public ExecutionSiddhiStream sql(String executionPlan) { + return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getEnvironment()); + } + } + + public static class SingleSiddhiStream extends ExecutableStream { + private final String streamId; + + public SingleSiddhiStream(String streamId, SiddhiCEP environment) { + super(environment); + environment.checkStreamDefined(streamId); + this.streamId = streamId; + } + + public UnionSiddhiStream union(String streamId, DataStream dataStream, String... fieldNames) { + getEnvironment().registerStream(streamId, dataStream, fieldNames); + return union(streamId); + } + + public UnionSiddhiStream union(String... streamIds) { + Preconditions.checkNotNull(streamIds); + return new UnionSiddhiStream(this.streamId, Arrays.asList(streamIds), this.getEnvironment()); + } + + @Override + protected DataStream> toDataStream() { + return convertDataStream(getEnvironment().getDataStream(this.streamId), this.streamId); + } + } + + public static class UnionSiddhiStream extends ExecutableStream { + private String firstStreamId; + private List unionStreamIds; + + public UnionSiddhiStream(String firstStreamId, List unionStreamIds, SiddhiCEP environment) { + super(environment); + environment.checkStreamDefined(firstStreamId); + for (String unionStreamId : unionStreamIds) { + environment.checkStreamDefined(unionStreamId); + } + this.firstStreamId = firstStreamId; + this.unionStreamIds = unionStreamIds; + } + + public UnionSiddhiStream union(String streamId, DataStream dataStream, String... fieldNames) { + getEnvironment().registerStream(streamId, dataStream, fieldNames); + return union(streamId); + } + + public UnionSiddhiStream union(String... streamId) { + List newUnionStreamIds = new LinkedList<>(); + newUnionStreamIds.addAll(unionStreamIds); + newUnionStreamIds.addAll(Arrays.asList(streamId)); + return new UnionSiddhiStream(this.firstStreamId, newUnionStreamIds, this.getEnvironment()); + } + + @Override + protected DataStream> toDataStream() { + final String localFirstStreamId = firstStreamId; + final List localUnionStreamIds = this.unionStreamIds; + DataStream> dataStream = convertDataStream(getEnvironment().getDataStream(localFirstStreamId), this.firstStreamId); + for (String unionStreamId : localUnionStreamIds) { + dataStream = dataStream.union(convertDataStream(getEnvironment().getDataStream(unionStreamId), unionStreamId)); + } + return dataStream; + } + } + + public static class ExecutionSiddhiStream { + private final DataStream> dataStream; + private final SiddhiCEP environment; + private final String executionPlan; + + public ExecutionSiddhiStream(DataStream> dataStream, String executionPlan, SiddhiCEP environment) { + this.executionPlan = executionPlan; + this.dataStream = dataStream; + this.environment = environment; + } + + /** + * @param outStreamId The streamId to return as data stream. + * @param Type information should match with stream definition. + * During execution phase, it will automatically build type information based on stream definition. + * @return Return output stream as Tuple + * @see SiddhiTypeFactory + */ + public DataStream returns(String outStreamId) { + SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext(); + siddhiContext.setExecutionPlan(executionPlan); + siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas()); + siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic()); + siddhiContext.setOutputStreamId(outStreamId); + siddhiContext.setExtensions(environment.getExtensions()); + siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig()); + TypeInformation typeInformation = + SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId); + siddhiContext.setOutputStreamType(typeInformation); + return returnsInternal(siddhiContext); + } + + /** + * @return Return output stream as DataStream<Map<String,Object>>, + * out type is LinkedHashMap<String,Object> and guarantee field order + * as defined in siddhi execution plan + * @see java.util.LinkedHashMap + */ + public DataStream> returnAsMap(String outStreamId) { + return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation()); + } + + /** + * @param outStreamId OutStreamId + * @param outType Output type class + * @param Output type + * @return Return output stream as POJO class. + */ + public DataStream returns(String outStreamId, Class outType) { + TypeInformation typeInformation = TypeExtractor.getForClass(outType); + return returnsInternal(outStreamId, typeInformation); + } + + private DataStream returnsInternal(String outStreamId, TypeInformation typeInformation) { + SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext(); + siddhiContext.setExecutionPlan(executionPlan); + siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas()); + siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic()); + siddhiContext.setOutputStreamId(outStreamId); + siddhiContext.setOutputStreamType(typeInformation); + siddhiContext.setExtensions(environment.getExtensions()); + siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig()); + return returnsInternal(siddhiContext); + } + + private DataStream returnsInternal(SiddhiOperatorContext siddhiContext) { + return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream); + } + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/exception/DuplicatedStreamException.java b/src/main/java/org/apache/flink/contrib/siddhi/exception/DuplicatedStreamException.java new file mode 100644 index 0000000..fbad5b9 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/exception/DuplicatedStreamException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.exception; + +public class DuplicatedStreamException extends RuntimeException { + public DuplicatedStreamException(String message) { + super(message); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/exception/UndefinedStreamException.java b/src/main/java/org/apache/flink/contrib/siddhi/exception/UndefinedStreamException.java new file mode 100644 index 0000000..03f6894 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/exception/UndefinedStreamException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.exception; + +public class UndefinedStreamException extends RuntimeException { + public UndefinedStreamException(String message) { + super(message); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java new file mode 100644 index 0000000..6edbe2d --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public abstract class AbstractSiddhiOperator extends AbstractStreamOperator implements OneInputStreamOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final SiddhiOperatorContext siddhiPlan; + private final String executionExpression; + private final boolean isProcessingTime; + private final Map> streamRecordSerializers; + + private transient SiddhiManager siddhiManager; + private transient ExecutionPlanRuntime siddhiRuntime; + private transient Map inputStreamHandlers; + + // queue to buffer out of order stream records + private transient PriorityQueue> priorityQueue; + + /** + * @param siddhiPlan Siddhi CEP Execution Plan + */ + public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + validate(siddhiPlan); + this.executionExpression = siddhiPlan.getFinalExecutionPlan(); + this.siddhiPlan = siddhiPlan; + this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + this.streamRecordSerializers = new HashMap<>(); + + for (String streamId : this.siddhiPlan.getInputStreams()) { + streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig())); + } + } + + protected abstract MultiplexingStreamRecordSerializer createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig); + + protected MultiplexingStreamRecordSerializer getStreamRecordSerializer(String streamId) { + if (streamRecordSerializers.containsKey(streamId)) { + return streamRecordSerializers.get(streamId); + } else { + throw new UndefinedStreamException("Stream " + streamId + " not defined"); + } + } + + @Override + public void processElement(StreamRecord element) throws Exception { + String streamId = getStreamId(element.getValue()); + StreamSchema schema = siddhiPlan.getInputStreamSchema(streamId); + + if (isProcessingTime) { + processEvent(streamId, schema, element.getValue(), System.currentTimeMillis()); + } else { + PriorityQueue> priorityQueue = getPriorityQueue(); + // event time processing + // we have to buffer the elements until we receive the proper watermark + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + } + } + + protected abstract void processEvent(String streamId, StreamSchema schema, IN value, long timestamp) throws Exception; + + @Override + public void processWatermark(Watermark mark) throws Exception { + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { + StreamRecord streamRecord = priorityQueue.poll(); + String streamId = getStreamId(streamRecord.getValue()); + long timestamp = streamRecord.getTimestamp(); + StreamSchema schema = siddhiPlan.getInputStreamSchema(streamId); + processEvent(streamId, schema, streamRecord.getValue(), timestamp); + } + output.emitWatermark(mark); + } + + public abstract String getStreamId(IN record); + + public PriorityQueue> getPriorityQueue() { + return priorityQueue; + } + + protected ExecutionPlanRuntime getSiddhiRuntime() { + return this.siddhiRuntime; + } + + public InputHandler getSiddhiInputHandler(String streamId) { + return inputStreamHandlers.get(streamId); + } + + protected SiddhiOperatorContext getSiddhiPlan() { + return this.siddhiPlan; + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output> output) { + super.setup(containingTask, config, output); + startSiddhiRuntime(); + } + + @Override + public void open() throws Exception { + if (priorityQueue == null) { + priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator()); + } + super.open(); + } + + /** + * Send input data to siddhi runtime + */ + protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException { + this.getSiddhiInputHandler(streamId).send(timestamp, data); + } + + /** + * Validate execution plan during building DAG before submitting to execution environment and fail-fast. + */ + private static void validate(final SiddhiOperatorContext siddhiPlan) { + SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager(); + try { + siddhiManager.validateExecutionPlan(siddhiPlan.getFinalExecutionPlan()); + } finally { + siddhiManager.shutdown(); + } + } + + /** + * Create and start execution runtime + */ + private void startSiddhiRuntime() { + if (this.siddhiRuntime == null) { + this.siddhiManager = this.siddhiPlan.createSiddhiManager(); + for (Map.Entry> entry : this.siddhiPlan.getExtensions().entrySet()) { + this.siddhiManager.setExtension(entry.getKey(), entry.getValue()); + } + this.siddhiRuntime = siddhiManager.createExecutionPlanRuntime(executionExpression); + this.siddhiRuntime.start(); + registerInputAndOutput(this.siddhiRuntime); + LOGGER.info("Siddhi runtime {} started", siddhiRuntime.getName()); + } else { + throw new IllegalStateException("Siddhi runtime has already been initialized"); + } + } + + private void shutdownSiddhiRuntime() { + if (this.siddhiRuntime != null) { + this.siddhiRuntime.shutdown(); + LOGGER.info("Siddhi runtime {} shutdown", this.siddhiRuntime.getName()); + this.siddhiRuntime = null; + this.siddhiManager.shutdown(); + this.siddhiManager = null; + this.inputStreamHandlers = null; + } else { + throw new IllegalStateException("Siddhi runtime has already shutdown"); + } + } + + @SuppressWarnings("unchecked") + private void registerInputAndOutput(ExecutionPlanRuntime runtime) { + AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId()); + runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output)); + inputStreamHandlers = new HashMap<>(); + for (String inputStreamId : this.siddhiPlan.getInputStreams()) { + inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId)); + } + } + + @Override + public void dispose() throws Exception { + LOGGER.info("Disposing"); + super.dispose(); + shutdownSiddhiRuntime(); + output.close(); + } + + @Override + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + final ObjectOutputStream oos = new ObjectOutputStream(out); + + // Write siddhi snapshot + byte[] siddhiRuntimeSnapshot = this.siddhiRuntime.snapshot(); + int siddhiRuntimeSnapshotLength = siddhiRuntimeSnapshot.length; + oos.writeInt(siddhiRuntimeSnapshotLength); + out.write(siddhiRuntimeSnapshot, 0, siddhiRuntimeSnapshotLength); + + // Write queue buffer snapshot + this.snapshotQueuerState(this.priorityQueue, new DataOutputViewStreamWrapper(oos)); + + oos.flush(); + } + + @Override + public void restoreState(FSDataInputStream state) throws Exception { + super.restoreState(state); + final ObjectInputStream ois = new ObjectInputStream(state); + + // Restore siddhi snapshot + startSiddhiRuntime(); + int siddhiRuntimeSnapshotLength = ois.readInt(); + byte[] siddhiRuntimeSnapshot = new byte[siddhiRuntimeSnapshotLength]; + int readLength = ois.read(siddhiRuntimeSnapshot, 0, siddhiRuntimeSnapshotLength); + assert readLength == siddhiRuntimeSnapshotLength; + this.siddhiRuntime.restore(siddhiRuntimeSnapshot); + + // Restore queue buffer snapshot + this.priorityQueue = restoreQueuerState(new DataInputViewStreamWrapper(ois)); + } + + protected abstract void snapshotQueuerState(PriorityQueue> queue, + DataOutputView dataOutputView) throws IOException; + + protected abstract PriorityQueue> restoreQueuerState(DataInputView dataInputView) throws IOException; +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java new file mode 100644 index 0000000..6b543e1 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.util.Preconditions; +import org.wso2.siddhi.core.SiddhiManager; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * SiddhiCEP Operator Execution Context + */ +public class SiddhiOperatorContext implements Serializable { + private ExecutionConfig executionConfig; + private Map> inputStreamSchemas; + private final Map> siddhiExtensions; + private String outputStreamId; + private TypeInformation outputStreamType; + private TimeCharacteristic timeCharacteristic; + private String name; + private String executionPlan; + + public SiddhiOperatorContext() { + inputStreamSchemas = new HashMap<>(); + siddhiExtensions = new HashMap<>(); + } + + public void setExtensions(Map> extensions) { + Preconditions.checkNotNull(extensions); + siddhiExtensions.putAll(extensions); + } + + public Map> getExtensions() { + return siddhiExtensions; + } + + public String getName() { + if (this.name == null) { + if (executionPlan.length() > 100) { + return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100); + } else { + return String.format("Siddhi: %s", executionPlan); + } + } else { + return this.name; + } + } + + public List getInputStreams() { + Object[] keys = this.inputStreamSchemas.keySet().toArray(); + List result = new ArrayList<>(keys.length); + for (Object key : keys) { + result.add((String) key); + } + return result; + } + + public String getExecutionPlan() { + return executionPlan; + } + + /** + * Stream definition + execution expression + */ + public String getFinalExecutionPlan() { + Preconditions.checkNotNull(executionPlan, "Execution plan is not set"); + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : inputStreamSchemas.entrySet()) { + sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey())); + } + sb.append(this.getExecutionPlan()); + return sb.toString(); + } + + public TypeInformation getOutputStreamType() { + return outputStreamType; + } + + public String getOutputStreamId() { + return outputStreamId; + } + + @SuppressWarnings("unchecked") + public StreamSchema getInputStreamSchema(String inputStreamId) { + if (!inputStreamSchemas.containsKey(inputStreamId)) { + throw new IllegalArgumentException("Input stream: " + inputStreamId + " is not found"); + } + return (StreamSchema) inputStreamSchemas.get(inputStreamId); + } + + public void setOutputStreamId(String outputStreamId) { + this.outputStreamId = outputStreamId; + } + + public void setOutputStreamType(TypeInformation outputStreamType) { + this.outputStreamType = outputStreamType; + } + + public TimeCharacteristic getTimeCharacteristic() { + return timeCharacteristic; + } + + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.timeCharacteristic = timeCharacteristic; + } + + public void setExecutionPlan(String executionPlan) { + this.executionPlan = executionPlan; + } + + public Map> getInputStreamSchemas() { + return inputStreamSchemas; + } + + public void setInputStreamSchemas(Map> inputStreamSchemas) { + this.inputStreamSchemas = inputStreamSchemas; + } + + public void setName(String name) { + this.name = name; + } + + public SiddhiManager createSiddhiManager() { + SiddhiManager siddhiManager = new SiddhiManager(); + for (Map.Entry> entry : getExtensions().entrySet()) { + siddhiManager.setExtension(entry.getKey(), entry.getValue()); + } + return siddhiManager; + } + + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + public void setExecutionConfig(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamOutputHandler.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamOutputHandler.java new file mode 100644 index 0000000..561fe58 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamOutputHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.contrib.siddhi.utils.SiddhiTupleFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.output.StreamCallback; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class StreamOutputHandler extends StreamCallback { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class); + + private final AbstractDefinition definition; + private final Output> output; + private final TypeInformation typeInfo; + private final ObjectMapper objectMapper; + + public StreamOutputHandler(TypeInformation typeInfo, AbstractDefinition definition, Output> output) { + this.typeInfo = typeInfo; + this.definition = definition; + this.output = output; + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public void receive(Event[] events) { + StreamRecord reusableRecord = new StreamRecord<>(null, 0L); + for (Event event : events) { + if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) { + reusableRecord.replace(toMap(event), event.getTimestamp()); + output.collect(reusableRecord); + } else if (typeInfo.isTupleType()) { + Tuple tuple = this.toTuple(event); + reusableRecord.replace(tuple, event.getTimestamp()); + output.collect(reusableRecord); + } else if (typeInfo instanceof PojoTypeInfo) { + R obj; + try { + obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass()); + } catch (IllegalArgumentException ex) { + LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex); + throw ex; + } + reusableRecord.replace(obj, event.getTimestamp()); + output.collect(reusableRecord); + } else { + throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo); + } + } + } + + private Map toMap(Event event) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < definition.getAttributeNameArray().length; i++) { + map.put(definition.getAttributeNameArray()[i], event.getData(i)); + } + return map; + } + + private T toTuple(Event event) { + return SiddhiTupleFactory.newTuple(event.getData()); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamRecordComparator.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamRecordComparator.java new file mode 100644 index 0000000..fad0df1 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/StreamRecordComparator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.Comparator; + +public class StreamRecordComparator implements Comparator>, Serializable { + private static final long serialVersionUID = 1581054988433915305L; + + @Override + public int compare(StreamRecord o1, StreamRecord o2) { + if (o1.getTimestamp() < o2.getTimestamp()) { + return -1; + } else if (o1.getTimestamp() > o2.getTimestamp()) { + return 1; + } else { + return 0; + } + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/TupleStreamSiddhiOperator.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/TupleStreamSiddhiOperator.java new file mode 100644 index 0000000..97d808b --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/TupleStreamSiddhiOperator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.PriorityQueue; + +/** + * Wrap input event in generic type of IN as Tuple2 + */ +public class TupleStreamSiddhiOperator extends AbstractSiddhiOperator, OUT> { + + public TupleStreamSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + super(siddhiPlan); + } + + @Override + protected MultiplexingStreamRecordSerializer> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) { + TypeInformation> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation) streamSchema.getTypeInfo()); + return new MultiplexingStreamRecordSerializer<>(tuple2TypeInformation.createSerializer(executionConfig)); + } + + @Override + protected void processEvent(String streamId, StreamSchema> schema, Tuple2 value, long timestamp) throws InterruptedException { + send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp); + } + + @Override + public String getStreamId(Tuple2 record) { + return record.f0; + } + + @Override + protected void snapshotQueuerState(PriorityQueue>> queue, DataOutputView dataOutputView) throws IOException { + dataOutputView.writeInt(queue.size()); + for (StreamRecord> record : queue) { + String streamId = record.getValue().f0; + dataOutputView.writeUTF(streamId); + this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView); + } + } + + @Override + protected PriorityQueue>> restoreQueuerState(DataInputView dataInputView) throws IOException { + int sizeOfQueue = dataInputView.readInt(); + PriorityQueue>> priorityQueue = new PriorityQueue<>(sizeOfQueue); + for (int i = 0; i < sizeOfQueue; i++) { + String streamId = dataInputView.readUTF(); + StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView); + priorityQueue.offer(streamElement.>asRecord()); + } + return priorityQueue; + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/package-info.java b/src/main/java/org/apache/flink/contrib/siddhi/package-info.java new file mode 100644 index 0000000..91b13d9 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/package-info.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

Features

+ * + *

+ *

Example

+ *
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ *
+ * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
+ *
+ * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
+ * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
+ *
+ * DataStream<Tuple4<Integer,String,Integer,String>> output = cep
+ * 	.from("inputStream1").union("inputStream2")
+ * 	.sql(
+ * 		"from every s1 = inputStream1[id == 2] "
+ * 		 + " -> s2 = inputStream2[id == 3] "
+ * 		 + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ * 		 + "insert into outputStream"
+ * 	)
+ * 	.returns("outputStream");
+ *
+ * env.execute();
+ * 
+ * + * @see /~https://github.com/wso2/siddhi + */ +package org.apache.flink.contrib.siddhi; diff --git a/src/main/java/org/apache/flink/contrib/siddhi/schema/SiddhiStreamSchema.java b/src/main/java/org/apache/flink/contrib/siddhi/schema/SiddhiStreamSchema.java new file mode 100644 index 0000000..c81d2bc --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/schema/SiddhiStreamSchema.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.util.Preconditions; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; + +public class SiddhiStreamSchema extends StreamSchema { + private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);"; + + public SiddhiStreamSchema(TypeInformation typeInfo, String... fieldNames) { + super(typeInfo, fieldNames); + } + + public SiddhiStreamSchema(TypeInformation typeInfo, int[] fieldIndexes, String[] fieldNames) { + super(typeInfo, fieldIndexes, fieldNames); + } + + public StreamDefinition getStreamDefinition(String streamId) { + StreamDefinition streamDefinition = StreamDefinition.id(streamId); + for (int i = 0; i < getFieldNames().length; i++) { + streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i])); + } + return streamDefinition; + } + + public String getStreamDefinitionExpression(StreamDefinition streamDefinition) { + List columns = new ArrayList<>(); + Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); + for (Attribute attribute : streamDefinition.getAttributeList()) { + columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase())); + } + return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ",")); + } + + public String getStreamDefinitionExpression(String streamId) { + StreamDefinition streamDefinition = getStreamDefinition(streamId); + List columns = new ArrayList<>(); + Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); + for (Attribute attribute : streamDefinition.getAttributeList()) { + columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase())); + } + return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ",")); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSchema.java b/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSchema.java new file mode 100644 index 0000000..145a03b --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSchema.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class StreamSchema implements Serializable { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamSchema.class); + private final TypeInformation typeInfo; + private final int[] fieldIndexes; + private final String[] fieldNames; + private TypeInformation[] fieldTypes; + private final StreamSerializer streamSerializer; + private TypeSerializer typeSerializer; + + public StreamSchema(TypeInformation typeInfo, String... fieldNames) { + Preconditions.checkNotNull(fieldNames, "Field name is required"); + this.typeInfo = typeInfo; + this.fieldNames = fieldNames; + this.fieldIndexes = getFieldIndexes(typeInfo, fieldNames); + this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames); + this.streamSerializer = new StreamSerializer<>(this); + } + + public StreamSchema(TypeInformation typeInfo, int[] fieldIndexes, String[] fieldNames) { + this.typeInfo = typeInfo; + this.fieldIndexes = fieldIndexes; + this.fieldNames = fieldNames; + this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames); + this.streamSerializer = new StreamSerializer<>(this); + } + + public boolean isAtomicType() { + return typeInfo instanceof AtomicType; + } + + public boolean isTupleType() { + return typeInfo instanceof TupleTypeInfo; + } + + public boolean isPojoType() { + return typeInfo instanceof PojoTypeInfo; + } + + public boolean isCaseClassType() { + return typeInfo instanceof CaseClassTypeInfo; + } + + public boolean isCompositeType() { + return typeInfo instanceof CompositeType; + } + + private int[] getFieldIndexes(TypeInformation typeInfo, String... fieldNames) { + int[] result; + if (isAtomicType()) { + result = new int[]{0}; + } else if (isTupleType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + result[i] = i; + } + } else if (isPojoType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + int index = ((PojoTypeInfo) typeInfo).getFieldIndex(fieldNames[i]); + if (index < 0) { + throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo); + } + result[i] = index; + } + } else if (isCaseClassType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + int index = ((CaseClassTypeInfo) typeInfo).getFieldIndex(fieldNames[i]); + if (index < 0) { + throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo); + } + result[i] = index; + } + } else { + throw new IllegalArgumentException("Failed to get field index from " + typeInfo); + } + return result; + } + + + private TypeInformation[] getFieldTypes(TypeInformation typeInfo, int[] fieldIndexes, String[] fieldNames) { + TypeInformation[] fieldTypes; + if (isCompositeType()) { + CompositeType cType = (CompositeType) typeInfo; + if (fieldNames.length != cType.getArity()) { +// throw new IllegalArgumentException("Arity of type (" + cType.getFieldNames().length+ ") " + +// "not equal to number of field names " + fieldNames.length + "."); + LOGGER.warn("Arity of type (" + cType.getFieldNames().length + ") " + + "not equal to number of field names " + fieldNames.length + "."); + } + fieldTypes = new TypeInformation[fieldIndexes.length]; + for (int i = 0; i < fieldIndexes.length; i++) { + fieldTypes[i] = cType.getTypeAt(fieldIndexes[i]); + } + } else if (isAtomicType()) { + if (fieldIndexes.length != 1 || fieldIndexes[0] != 0) { + throw new IllegalArgumentException( + "Non-composite input type may have only a single field and its index must be 0."); + } + fieldTypes = new TypeInformation[]{typeInfo}; + } else { + throw new IllegalArgumentException( + "Illegal input type info" + ); + } + return fieldTypes; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + public int[] getFieldIndexes() { + return fieldIndexes; + } + + public String[] getFieldNames() { + return fieldNames; + } + + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + public StreamSerializer getStreamSerializer() { + return streamSerializer; + } + + public TypeSerializer getTypeSerializer() { + return typeSerializer; + } + + public void setTypeSerializer(TypeSerializer typeSerializer) { + this.typeSerializer = typeSerializer; + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSerializer.java b/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSerializer.java new file mode 100644 index 0000000..9ac4b51 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/schema/StreamSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.lang.reflect.Field; + +public class StreamSerializer implements Serializable { + private final StreamSchema schema; + + public StreamSerializer(StreamSchema schema) { + this.schema = schema; + } + + public Object[] getRow(T input) { + Preconditions.checkArgument(input.getClass() == schema.getTypeInfo().getTypeClass() + , "Invalid input type: " + input + ", expected: " + schema.getTypeInfo()); + + Object[] data; + if (schema.isAtomicType()) { + data = new Object[]{input}; + } else if (schema.isTupleType()) { + Tuple tuple = (Tuple) input; + data = new Object[schema.getFieldIndexes().length]; + for (int i = 0; i < schema.getFieldIndexes().length; i++) { + data[i] = tuple.getField(schema.getFieldIndexes()[i]); + } + } else if (schema.isPojoType() || schema.isCaseClassType()) { + data = new Object[schema.getFieldIndexes().length]; + for (int i = 0; i < schema.getFieldNames().length; i++) { + data[i] = getFieldValue(schema.getFieldNames()[i], input); + } + } else { + throw new IllegalArgumentException("Failed to get field values from " + schema.getTypeInfo()); + } + return data; + } + + private Object getFieldValue(String fieldName, T input) { + // TODO: Cache Field Accessor + Field field = TypeExtractor.getDeclaredField(schema.getTypeInfo().getTypeClass(), fieldName); + if (field == null) { + throw new IllegalArgumentException(fieldName + " is not found in " + schema.getTypeInfo()); + } + if (!field.isAccessible()) { + field.setAccessible(true); + } + try { + return field.get(input); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiStreamFactory.java b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiStreamFactory.java new file mode 100644 index 0000000..d7431b8 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiStreamFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.contrib.siddhi.operator.TupleStreamSiddhiOperator; +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output DataStream + */ +public class SiddhiStreamFactory { + @SuppressWarnings("unchecked") + public static DataStream createDataStream(SiddhiOperatorContext context, DataStream> namedStream) { + return namedStream.transform(context.getName(), context.getOutputStreamType(), new TupleStreamSiddhiOperator(context)); + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactory.java b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactory.java new file mode 100644 index 0000000..28ff99a --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactory.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.util.Preconditions; + +public class SiddhiTupleFactory { + /** + * Convert object array to type of Tuple{N} where N is between 0 to 25. + */ + public static T newTuple(Object[] row) { + Preconditions.checkNotNull(row, "Tuple row is null"); + switch (row.length) { + case 0: + return setTupleValue(new Tuple0(), row); + case 1: + return setTupleValue(new Tuple1(), row); + case 2: + return setTupleValue(new Tuple2(), row); + case 3: + return setTupleValue(new Tuple3(), row); + case 4: + return setTupleValue(new Tuple4(), row); + case 5: + return setTupleValue(new Tuple5(), row); + case 6: + return setTupleValue(new Tuple6(), row); + case 7: + return setTupleValue(new Tuple7(), row); + case 8: + return setTupleValue(new Tuple8(), row); + case 9: + return setTupleValue(new Tuple9(), row); + case 10: + return setTupleValue(new Tuple10(), row); + case 11: + return setTupleValue(new Tuple11(), row); + case 12: + return setTupleValue(new Tuple12(), row); + case 13: + return setTupleValue(new Tuple13(), row); + case 14: + return setTupleValue(new Tuple14(), row); + case 15: + return setTupleValue(new Tuple15(), row); + case 16: + return setTupleValue(new Tuple16(), row); + case 17: + return setTupleValue(new Tuple17(), row); + case 18: + return setTupleValue(new Tuple18(), row); + case 19: + return setTupleValue(new Tuple19(), row); + case 20: + return setTupleValue(new Tuple20(), row); + case 21: + return setTupleValue(new Tuple21(), row); + case 22: + return setTupleValue(new Tuple22(), row); + case 23: + return setTupleValue(new Tuple23(), row); + case 24: + return setTupleValue(new Tuple24(), row); + case 25: + return setTupleValue(new Tuple25(), row); + default: + throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple"); + } + } + + @SuppressWarnings("unchecked") + public static T setTupleValue(Tuple tuple, Object[] row) { + if (row.length != tuple.getArity()) { + throw new IllegalArgumentException("Row length" + row.length + " is not equal with tuple's arity: " + tuple.getArity()); + } + for (int i = 0; i < row.length; i++) { + tuple.setField(row[i], i); + } + return (T) tuple; + } +} diff --git a/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java new file mode 100644 index 0000000..0a2d3f0 --- /dev/null +++ b/src/main/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information. + */ +public class SiddhiTypeFactory { + private final static Map, Attribute.Type> JAVA_TO_SIDDHI_TYPE = new HashMap<>(); + private final static Map> SIDDHI_TO_JAVA_TYPE = new HashMap<>(); + + static { + registerType(String.class, Attribute.Type.STRING); + registerType(Integer.class, Attribute.Type.INT); + registerType(int.class, Attribute.Type.INT); + registerType(Long.class, Attribute.Type.LONG); + registerType(long.class, Attribute.Type.LONG); + registerType(Float.class, Attribute.Type.FLOAT); + registerType(float.class, Attribute.Type.FLOAT); + registerType(Double.class, Attribute.Type.DOUBLE); + registerType(double.class, Attribute.Type.DOUBLE); + registerType(Boolean.class, Attribute.Type.BOOL); + registerType(boolean.class, Attribute.Type.BOOL); + } + + public static void registerType(Class javaType, Attribute.Type siddhiType) { + if (JAVA_TO_SIDDHI_TYPE.containsKey(javaType)) { + throw new IllegalArgumentException("Java type: " + javaType + " or siddhi type: " + siddhiType + " were already registered"); + } + JAVA_TO_SIDDHI_TYPE.put(javaType, siddhiType); + SIDDHI_TO_JAVA_TYPE.put(siddhiType, javaType); + } + + public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) { + SiddhiManager siddhiManager = null; + ExecutionPlanRuntime runtime = null; + try { + siddhiManager = new SiddhiManager(); + runtime = siddhiManager.createExecutionPlanRuntime(executionPlan); + Map definitionMap = runtime.getStreamDefinitionMap(); + if (definitionMap.containsKey(streamId)) { + return definitionMap.get(streamId); + } else { + throw new IllegalArgumentException("Unknown stream id" + streamId); + } + } finally { + if (runtime != null) { + runtime.shutdown(); + } + if (siddhiManager != null) { + siddhiManager.shutdown(); + } + } + } + + public static TypeInformation getTupleTypeInformation(AbstractDefinition definition) { + int tupleSize = definition.getAttributeList().size(); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("Tuple").append(tupleSize); + stringBuilder.append("<"); + List attributeTypes = new ArrayList<>(); + for (Attribute attribute : definition.getAttributeList()) { + attributeTypes.add(getJavaType(attribute.getType()).getName()); + } + stringBuilder.append(StringUtils.join(attributeTypes, ",")); + stringBuilder.append(">"); + try { + return TypeInfoParser.parse(stringBuilder.toString()); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex); + } + } + + public static TypeInformation getTupleTypeInformation(String executionPlan, String streamId) { + return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId)); + } + + @SuppressWarnings("unchecked") + private static final TypeInformation MAP_TYPE_INFORMATION = TypeExtractor.createTypeInfo(new HashMap().getClass()); + + public static TypeInformation> getMapTypeInformation() { + return (TypeInformation>) MAP_TYPE_INFORMATION; + } + + public static Attribute.Type getAttributeType(TypeInformation fieldType) { + if (JAVA_TO_SIDDHI_TYPE.containsKey(fieldType.getTypeClass())) { + return JAVA_TO_SIDDHI_TYPE.get(fieldType.getTypeClass()); + } else { + return Attribute.Type.OBJECT; + } + } + + public static Class getJavaType(Attribute.Type attributeType) { + if (!SIDDHI_TO_JAVA_TYPE.containsKey(attributeType)) { + throw new IllegalArgumentException("Unable to get java type for siddhi attribute type: " + attributeType); + } + return SIDDHI_TO_JAVA_TYPE.get(attributeType); + } + + public static TypeInformation> getStreamTupleTypeInformation(TypeInformation typeInformation) { + return TypeInfoParser.parse("Tuple2"); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java b/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java new file mode 100644 index 0000000..a4b3043 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.extension.CustomPlusFunctionExtension; +import org.apache.flink.contrib.siddhi.source.Event; +import org.apache.flink.contrib.siddhi.source.RandomEventSource; +import org.apache.flink.contrib.siddhi.source.RandomTupleSource; +import org.apache.flink.contrib.siddhi.source.RandomWordSource; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * flink-siddhi integration test cases + */ +public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testSimplePojoStreamAndReturnPojo() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.fromElements( + Event.of(1, "start", 1.0), + Event.of(2, "middle", 2.0), + Event.of(3, "end", 3.0), + Event.of(4, "start", 4.0), + Event.of(5, "middle", 5.0), + Event.of(6, "end", 6.0) + ); + + DataStream output = SiddhiCEP + .define("inputStream", input, "id", "name", "price") + .sql("from inputStream insert into outputStream") + .returns("outputStream", Event.class); + String path = tempFolder.newFile().toURI().toString(); + output.writeAsText(path); + env.execute(); + } + + @Test + public void testUnboundedPojoSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.addSource(new RandomEventSource(5)); + + DataStream> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + DataStream following = output.map(new MapFunction, Integer>() { + @Override + public Integer map(Tuple4 value) throws Exception { + return value.f1; + } + }); + String resultPath = tempFolder.newFile().toURI().toString(); + following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedTupleSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream> input = env.addSource(new RandomTupleSource(5).closeDelay(1500)); + + DataStream> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.addSource(new RandomWordSource(5).closeDelay(1500)); + + DataStream> output = SiddhiCEP + .define("wordStream", input, "words") + .sql("from wordStream select words insert into outputStream") + .returns("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test(expected = InvalidTypesException.class) + public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.addSource(new RandomEventSource(5).closeDelay(1500)); + + DataStream> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + DataStream following = output.map(new MapFunction, Long>() { + @Override + public Long map(Tuple5 value) throws Exception { + return value.f0; + } + }); + + String resultPath = tempFolder.newFile().toURI().toString(); + following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + env.execute(); + } + + @Test + public void testUnboundedPojoStreamAndReturnMap() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + DataStream input = env.addSource(new RandomEventSource(5)); + + DataStream> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, price insert into outputStream") + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedPojoStreamAndReturnPojo() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.addSource(new RandomEventSource(5)); + input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { + @Override + public long extractAscendingTimestamp(Event element) { + return element.getTimestamp(); + } + }); + + DataStream output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream", Event.class); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + + @Test + public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(2), "input1"); + DataStream input2 = env.addSource(new RandomEventSource(2), "input2"); + DataStream input3 = env.addSource(new RandomEventSource(2), "input2"); + DataStream output = SiddhiCEP + .define("inputStream1", input1, "id", "name", "price", "timestamp") + .union("inputStream2", input2, "id", "name", "price", "timestamp") + .union("inputStream3", input3, "id", "name", "price", "timestamp") + .sql( + "from inputStream1 select timestamp, id, name, price insert into outputStream;" + + "from inputStream2 select timestamp, id, name, price insert into outputStream;" + + "from inputStream3 select timestamp, id, name, price insert into outputStream;" + ) + .returns("outputStream", Event.class); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(6, getLineCount(resultPath)); + } + + /** + * @see https://docs.wso2.com/display/CEP300/Joins + */ + @Test + public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(5), "input1"); + DataStream input2 = env.addSource(new RandomEventSource(5), "input2"); + + DataStream output = SiddhiCEP + .define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp") + .union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp") + .sql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returnAsMap("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + /** + * @see https://docs.wso2.com/display/CEP300/Patterns + */ + @Test + public void testUnboundedPojoStreamSimplePatternMatch() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1"); + DataStream input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2"); + + DataStream> output = SiddhiCEP + .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp") + .union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp") + .sql( + "from every s1 = inputStream1[id == 2] " + + " -> s2 = inputStream2[id == 3] " + + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 " + + "insert into outputStream" + ) + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(1, getLineCount(resultPath)); + compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath); + } + + /** + * @see https://docs.wso2.com/display/CEP300/Sequences + */ + @Test + public void testUnboundedPojoStreamSimpleSequences() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1"); + DataStream> output = SiddhiCEP + .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp") + .union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp") + .sql( + "from every s1 = inputStream1[id == 2]+ , " + + "s2 = inputStream2[id == 3]? " + + "within 1000 second " + + "select s1[0].name as n1, s2.name as n2 " + + "insert into outputStream" + ) + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(1, getLineCount(resultPath)); + } + + private static int getLineCount(String resPath) throws IOException { + List result = new LinkedList<>(); + readAllResultLines(result, resPath); + return result.size(); + } + + @Test + public void testCustomizeSiddhiFunctionExtension() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = env.addSource(new RandomEventSource(5)); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class); + + DataStream> output = cep + .from("inputStream", input, "id", "name", "price", "timestamp") + .sql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream") + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(5), "input1"); + DataStream input2 = env.addSource(new RandomEventSource(5), "input2"); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class); + + cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp"); + cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp"); + + DataStream> output = cep + .from("inputStream1").union("inputStream2") + .sql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returns("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test(expected = UndefinedStreamException.class) + public void testTriggerUndefinedStreamException() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input1 = env.addSource(new RandomEventSource(5), "input1"); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp"); + + DataStream> output = cep + .from("inputStream1").union("inputStream2") + .sql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returnAsMap("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java b/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java new file mode 100644 index 0000000..6e2746e --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/extension/CustomPlusFunctionExtension.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.extension; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class CustomPlusFunctionExtension extends FunctionExecutor { + + private Attribute.Type returnType; + + /** + * The initialization method for FunctionExecutor, this method will be called before the other methods + * + * @param attributeExpressionExecutors are the executors of each function parameters + * @param executionPlanContext the context of the execution plan + */ + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) { + Attribute.Type attributeType = expressionExecutor.getReturnType(); + if (attributeType == Attribute.Type.DOUBLE) { + returnType = attributeType; + + } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { + throw new ExecutionPlanCreationException("Plus cannot have parameters with types String or Bool"); + } else { + returnType = Attribute.Type.LONG; + } + } + } + + /** + * The main execution method which will be called upon event arrival + * when there are more then one function parameter + * + * @param data the runtime values of function parameters + * @return the function result + */ + @Override + protected Object execute(Object[] data) { + if (returnType == Attribute.Type.DOUBLE) { + double total = 0; + for (Object aObj : data) { + total += Double.parseDouble(String.valueOf(aObj)); + } + + return total; + } else { + long total = 0; + for (Object aObj : data) { + total += Long.parseLong(String.valueOf(aObj)); + } + return total; + } + } + + /** + * The main execution method which will be called upon event arrival + * when there are zero or one function parameter + * + * @param data null if the function parameter count is zero or + * runtime data value of the function parameter + * @return the function result + */ + @Override + protected Object execute(Object data) { + if (returnType == Attribute.Type.DOUBLE) { + return Double.parseDouble(String.valueOf(data)); + } else { + return Long.parseLong(String.valueOf(data)); + } + } + + /** + * This will be called only once and this can be used to acquire + * required resources for the processing element. + * This will be called after initializing the system and before + * starting to process the events. + */ + @Override + public void start() { + + } + + /** + * This will be called only once and this can be used to release + * the acquired resources for processing. + * This will be called before shutting down the system. + */ + @Override + public void stop() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } + + /** + * Used to collect the serializable state of the processing element, that need to be + * persisted for the reconstructing the element to the same state on a different point of time + * + * @return stateful objects of the processing element as an array + */ + @Override + public Object[] currentState() { + return new Object[0]; + } + + /** + * Used to restore serialized state of the processing element, for reconstructing + * the element to the same state as if was on a previous point of time. + * + * @param state the stateful objects of the element as an array on + * the same order provided by currentState(). + */ + @Override + public void restoreState(Object[] state) { + + } + +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java b/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java new file mode 100644 index 0000000..d70107d --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/operator/SiddhiSyntaxTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.core.stream.output.StreamCallback; + +import java.util.ArrayList; +import java.util.List; + +public class SiddhiSyntaxTest { + + private SiddhiManager siddhiManager; + + @Before + public void setUp() { + siddhiManager = new SiddhiManager(); + } + + @After + public void after() { + siddhiManager = new SiddhiManager(); + } + + @Test + public void testSimplePlan() throws InterruptedException { + ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime( + "define stream inStream (name string, value double);" + + "from inStream insert into outStream"); + runtime.start(); + + final List received = new ArrayList<>(3); + InputHandler inputHandler = runtime.getInputHandler("inStream"); + Assert.assertNotNull(inputHandler); + + try { + runtime.getInputHandler("unknownStream"); + Assert.fail("Should throw exception for getting input handler for unknown streamId."); + } catch (Exception ex) { + // Expected exception for getting input handler for illegal streamId. + } + + runtime.addCallback("outStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + received.add(event.getData()); + } + } + }); + + inputHandler.send(new Object[]{"a", 1.1}); + inputHandler.send(new Object[]{"b", 1.2}); + inputHandler.send(new Object[]{"c", 1.3}); + Thread.sleep(100); + Assert.assertEquals(3, received.size()); + Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1}); + Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2}); + Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3}); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/schema/SiddhiExecutionPlanSchemaTest.java b/src/test/java/org/apache/flink/contrib/siddhi/schema/SiddhiExecutionPlanSchemaTest.java new file mode 100644 index 0000000..8af3cc8 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/schema/SiddhiExecutionPlanSchemaTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.contrib.siddhi.source.Event; +import org.junit.Test; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; + +import static org.junit.Assert.*; + +public class SiddhiExecutionPlanSchemaTest { + @Test + public void testStreamSchemaWithPojo() { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(4, schema.getFieldIndexes().length); + + StreamDefinition streamDefinition = schema.getStreamDefinition("test_stream"); + assertArrayEquals(new String[]{"id", "timestamp", "name", "price"}, streamDefinition.getAttributeNameArray()); + + assertEquals(Attribute.Type.INT, streamDefinition.getAttributeType("id")); + assertEquals(Attribute.Type.LONG, streamDefinition.getAttributeType("timestamp")); + assertEquals(Attribute.Type.STRING, streamDefinition.getAttributeType("name")); + assertEquals(Attribute.Type.DOUBLE, streamDefinition.getAttributeType("price")); + + assertEquals("define stream test_stream (id int,timestamp long,name string,price double);", schema.getStreamDefinitionExpression("test_stream")); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSchemaTest.java b/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSchemaTest.java new file mode 100644 index 0000000..c69418a --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSchemaTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.contrib.siddhi.source.Event; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StreamSchemaTest { + @Test + public void testStreamSchemaWithPojo() { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + StreamSchema schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(4, schema.getFieldIndexes().length); + assertEquals(Event.class, schema.getTypeInfo().getTypeClass()); + } + + @Test + public void testStreamSchemaWithTuple() { + TypeInformation typeInfo = TypeInfoParser.parse("Tuple4"); + StreamSchema schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + assertEquals(4, schema.getFieldIndexes().length); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + } + + @Test + public void testStreamSchemaWithPrimitive() { + TypeInformation typeInfo = TypeInfoParser.parse("String"); + StreamSchema schema = new StreamSchema<>(typeInfo, "words"); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + assertEquals(1, schema.getFieldIndexes().length); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + } + + @Test(expected = IllegalArgumentException.class) + public void testStreamSchemaWithPojoAndUnknownField() { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(Event.class); + new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price", "unknown"); + } + + @Test + public void testStreamTupleSerializerWithPojo() { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + StreamSchema schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Event.class, schema.getTypeInfo().getTypeClass()); + + TypeInformation> tuple2TypeInformation = TypeInfoParser.parse("Tuple2"); + assertEquals("Java Tuple2>", tuple2TypeInformation.toString()); + } + + @Test + public void testStreamTupleSerializerWithTuple() { + TypeInformation typeInfo = TypeInfoParser.parse("Tuple4"); + StreamSchema schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + TypeInformation> tuple2TypeInformation = TypeInfoParser.parse("Tuple2"); + assertEquals("Java Tuple2>", tuple2TypeInformation.toString()); + } + + @Test + public void testStreamTupleSerializerWithPrimitive() { + TypeInformation typeInfo = TypeInfoParser.parse("String"); + StreamSchema schema = new StreamSchema<>(typeInfo, "words"); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + TypeInformation> tuple2TypeInformation = TypeInfoParser.parse("Tuple2"); + assertEquals("Java Tuple2", tuple2TypeInformation.toString()); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSerializerTest.java b/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSerializerTest.java new file mode 100644 index 0000000..0005306 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/schema/StreamSerializerTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.schema; + +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.contrib.siddhi.source.Event; +import org.junit.Assert; +import org.junit.Test; + +public class StreamSerializerTest { + private final static long CURRENT = System.currentTimeMillis(); + + @Test + public void testSimplePojoRead() { + Event event = new Event(); + event.setId(1); + event.setName("test"); + event.setPrice(56.7); + event.setTimestamp(CURRENT); + + StreamSchema schema = new StreamSchema<>(TypeExtractor.createTypeInfo(Event.class), "id", "name", "price", "timestamp"); + StreamSerializer reader = new StreamSerializer<>(schema); + Assert.assertArrayEquals(new Object[]{1, "test", 56.7, CURRENT}, reader.getRow(event)); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/source/Event.java b/src/test/java/org/apache/flink/contrib/siddhi/source/Event.java new file mode 100644 index 0000000..eda8eed --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/source/Event.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.util.Objects; + +public class Event { + private long timestamp; + private String name; + private double price; + private int id; + + public double getPrice() { + return price; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "Event(" + id + ", " + name + ", " + price + ", " + timestamp + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Event) { + Event other = (Event) obj; + + return name.equals(other.name) && price == other.price && id == other.id && timestamp == other.timestamp; + } else { + return false; + } + } + + public static Event of(int id, String name, double price) { + Event event = new Event(); + event.setId(id); + event.setName(name); + event.setPrice(price); + event.setTimestamp(System.currentTimeMillis()); + return event; + } + + public static Event of(int id, String name, double price, long timestamp) { + Event event = new Event(); + event.setId(id); + event.setName(name); + event.setPrice(price); + event.setTimestamp(timestamp); + return event; + } + + @Override + public int hashCode() { + return Objects.hash(name, price, id); + } + + public static TypeSerializer createTypeSerializer() { + TypeInformation typeInformation = (TypeInformation) TypeExtractor.createTypeInfo(Event.class); + + return typeInformation.createSerializer(new ExecutionConfig()); + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public void setPrice(double price) { + this.price = price; + } + + public void setId(int id) { + this.id = id; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/source/RandomEventSource.java b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomEventSource.java new file mode 100644 index 0000000..5848930 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomEventSource.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.source; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomEventSource implements SourceFunction { + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private volatile long closeDelayTimestamp = 1000; + + public RandomEventSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomEventSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomEventSource(int count) { + this(count, System.currentTimeMillis()); + } + + public RandomEventSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (isRunning) { + ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number)); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/source/RandomTupleSource.java b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomTupleSource.java new file mode 100644 index 0000000..271b906 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomTupleSource.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.source; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomTupleSource implements SourceFunction> { + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private long closeDelayTimestamp; + + public RandomTupleSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomTupleSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomTupleSource(int count) { + this(count, System.currentTimeMillis()); + } + + + public RandomTupleSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext> ctx) throws Exception { + while (isRunning) { + ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number)); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(this.closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/source/RandomWordSource.java b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomWordSource.java new file mode 100644 index 0000000..23fc506 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/source/RandomWordSource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.source; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomWordSource implements SourceFunction { + private static final String[] WORDS = new String[]{ + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; + + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private long closeDelayTimestamp; + + public RandomWordSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomWordSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomWordSource(int count) { + this(count, System.currentTimeMillis()); + } + + + public RandomWordSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (isRunning) { + ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(this.closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactoryTest.java b/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactoryTest.java new file mode 100644 index 0000000..3279e57 --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTupleFactoryTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class SiddhiTupleFactoryTest { + @Test + public void testConvertObjectArrayToTuple() { + Object[] row = new Object[]{1, "message", 1234567L, true, new Object()}; + Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row); + assertEquals(5, tuple5.getArity()); + assertArrayEquals(row, new Object[]{ + tuple5.f0, + tuple5.f1, + tuple5.f2, + tuple5.f3, + tuple5.f4 + }); + } + + @Test(expected = IllegalArgumentException.class) + public void testConvertTooLongObjectArrayToTuple() { + Object[] row = new Object[26]; + SiddhiTupleFactory.newTuple(row); + } +} diff --git a/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactoryTest.java b/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactoryTest.java new file mode 100644 index 0000000..89bf50f --- /dev/null +++ b/src/test/java/org/apache/flink/contrib/siddhi/utils/SiddhiTypeFactoryTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.junit.Assert; +import org.junit.Test; + +public class SiddhiTypeFactoryTest { + @Test + public void testTypeInfoParser() { + TypeInformation> type1 = TypeInfoParser.parse("Tuple3"); + Assert.assertNotNull(type1); + TypeInformation> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">"); + Assert.assertNotNull(type2); + } + + public static class InnerPojo { + } + + @Test + public void testBuildTypeInformationForSiddhiStream() { + String query = "define stream inputStream (timestamp long, name string, value double);" + + "from inputStream select name, value insert into outputStream;"; + TypeInformation> inputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "inputStream"); + TypeInformation> outputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "outputStream"); + + Assert.assertNotNull(inputStreamType); + Assert.assertNotNull(outputStreamType); + } +} diff --git a/src/test/resources/log4j-test.properties b/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2226f68 --- /dev/null +++ b/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 0000000..b7a5793 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + + + + + diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml new file mode 100644 index 0000000..b8f1b1a --- /dev/null +++ b/tools/maven/checkstyle.xml @@ -0,0 +1,159 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml new file mode 100644 index 0000000..0f7f6bb --- /dev/null +++ b/tools/maven/scalastyle-config.xml @@ -0,0 +1,146 @@ + + + + + + + + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml new file mode 100644 index 0000000..2c29054 --- /dev/null +++ b/tools/maven/suppressions.xml @@ -0,0 +1,28 @@ + + + + + + + + +