Skip to content

Commit

Permalink
Rename DSL sql to cql for Siddhi Streaming QL
Browse files Browse the repository at this point in the history
  • Loading branch information
haoch committed Oct 10, 2016
1 parent 0ed66c3 commit ebde425
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ This project is mainly to provide a light-weight library to easily run Siddhi CE
DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
.from("inputStream1").union("inputStream2")
.sql(
.cql(
"from every s1 = inputStream1[id == 2] "
+ " -> s2 = inputStream2[id == 3] "
+ "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ public ExecutableStream(SiddhiCEP environment) {
}

/**
* Siddhi Continuous Query Language (CQL)
*
* @param executionPlan Siddhi SQL-Like execution plan query
* @return ExecutionSiddhiStream context
*/
public ExecutionSiddhiStream sql(String executionPlan) {
public ExecutionSiddhiStream cql(String executionPlan) {
Preconditions.checkNotNull(executionPlan,"executionPlan");
return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public List<String> getInputStreams() {
}

/**
* @return Siddhi CEP sql-like execution plan
* @return Siddhi CEP cql-like execution plan
*/
public String getExecutionPlan() {
return executionPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
*
* DataStream&lt;Tuple4&lt;Integer,String,Integer,String&gt;&gt; output = cep
* .from("inputStream1").union("inputStream2")
* .sql(
* .cql(
* "from every s1 = inputStream1[id == 2] "
* + " -> s2 = inputStream2[id == 3] "
* + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
Expand Down
28 changes: 14 additions & 14 deletions src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testSimplePojoStreamAndReturnPojo() throws Exception {

DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price")
.sql("from inputStream insert into outputStream")
.cql("from inputStream insert into outputStream")
.returns("outputStream", Event.class);
String path = tempFolder.newFile().toURI().toString();
output.writeAsText(path);
Expand All @@ -81,7 +81,7 @@ public void testUnboundedPojoSourceAndReturnTuple() throws Exception {

DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, price insert into outputStream")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");

DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
Expand All @@ -103,7 +103,7 @@ public void testUnboundedTupleSourceAndReturnTuple() throws Exception {

DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, price insert into outputStream")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");

String resultPath = tempFolder.newFile().toURI().toString();
Expand All @@ -119,7 +119,7 @@ public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {

DataStream<Tuple1<String>> output = SiddhiCEP
.define("wordStream", input, "words")
.sql("from wordStream select words insert into outputStream")
.cql("from wordStream select words insert into outputStream")
.returns("outputStream");

String resultPath = tempFolder.newFile().toURI().toString();
Expand All @@ -135,7 +135,7 @@ public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception

DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, price insert into outputStream")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");

DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() {
Expand All @@ -161,7 +161,7 @@ public void testUnboundedPojoStreamAndReturnMap() throws Exception {

DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, price insert into outputStream")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returnAsMap("outputStream");

String resultPath = tempFolder.newFile().toURI().toString();
Expand All @@ -183,7 +183,7 @@ public long extractAscendingTimestamp(Event element) {

DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, price insert into outputStream")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream", Event.class);

String resultPath = tempFolder.newFile().toURI().toString();
Expand All @@ -203,7 +203,7 @@ public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
.define("inputStream1", input1, "id", "name", "price", "timestamp")
.union("inputStream2", input2, "id", "name", "price", "timestamp")
.union("inputStream3", input3, "id", "name", "price", "timestamp")
.sql(
.cql(
"from inputStream1 select timestamp, id, name, price insert into outputStream;"
+ "from inputStream2 select timestamp, id, name, price insert into outputStream;"
+ "from inputStream3 select timestamp, id, name, price insert into outputStream;"
Expand All @@ -228,7 +228,7 @@ public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Excep
DataStream<? extends Map> output = SiddhiCEP
.define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
.union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
.sql(
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
Expand All @@ -255,7 +255,7 @@ public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
.union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp")
.sql(
.cql(
"from every s1 = inputStream1[id == 2] "
+ " -> s2 = inputStream2[id == 3] "
+ "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
Expand All @@ -280,7 +280,7 @@ public void testUnboundedPojoStreamSimpleSequences() throws Exception {
DataStream<Map<String, Object>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
.union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp")
.sql(
.cql(
"from every s1 = inputStream1[id == 2]+ , "
+ "s2 = inputStream2[id == 3]? "
+ "within 1000 second "
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testCustomizeSiddhiFunctionExtension() throws Exception {

DataStream<Map<String, Object>> output = cep
.from("inputStream", input, "id", "name", "price", "timestamp")
.sql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream")
.cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream")
.returnAsMap("outputStream");

String resultPath = tempFolder.newFile().toURI().toString();
Expand All @@ -334,7 +334,7 @@ public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exce

DataStream<Tuple4<Long, String, Double, Double>> output = cep
.from("inputStream1").union("inputStream2")
.sql(
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
Expand All @@ -359,7 +359,7 @@ public void testTriggerUndefinedStreamException() throws Exception {

DataStream<Map<String, Object>> output = cep
.from("inputStream1").union("inputStream2")
.sql(
.cql(
"from inputStream1#window.length(5) as s1 "
+ "join inputStream2#window.time(500) as s2 "
+ "on s1.id == s2.id "
Expand Down

0 comments on commit ebde425

Please sign in to comment.