Skip to content

Commit

Permalink
Add control stream
Browse files Browse the repository at this point in the history
  • Loading branch information
haoch committed Feb 9, 2018
1 parent a1d8c1d commit c15eb8a
Show file tree
Hide file tree
Showing 37 changed files with 2,684 additions and 2,160 deletions.
316 changes: 158 additions & 158 deletions src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,183 +49,183 @@
*/
@PublicEvolving
public class SiddhiCEP {
private final StreamExecutionEnvironment executionEnvironment;
private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
private final Map<String, Class<?>> extensions = new HashMap<>();
private final StreamExecutionEnvironment executionEnvironment;
private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
private final Map<String, Class<?>> extensions = new HashMap<>();

/**
* @param streamExecutionEnvironment Stream Execution Environment
/**
* @param streamExecutionEnvironment Stream Execution Environment
*/
private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;
}
private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;
}

/**
* @see DataStream
* @return Siddhi streamId and source DataStream mapping.
/**
* @see DataStream
* @return Siddhi streamId and source DataStream mapping.
*/
public Map<String, DataStream<?>> getDataStreams() {
return this.dataStreams;
}
public Map<String, DataStream<?>> getDataStreams() {
return this.dataStreams;
}

/**
* @see SiddhiStreamSchema
* @return Siddhi streamId and stream schema mapping.
/**
* @see SiddhiStreamSchema
* @return Siddhi streamId and stream schema mapping.
*/
public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
return this.dataStreamSchemas;
}
public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
return this.dataStreamSchemas;
}

/**
* @param streamId Siddhi streamId to check.
* @return whether the given streamId is defined in current SiddhiCEP environment.
/**
* @param streamId Siddhi streamId to check.
* @return whether the given streamId is defined in current SiddhiCEP environment.
*/
public boolean isStreamDefined(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return dataStreams.containsKey(streamId);
}
public boolean isStreamDefined(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return dataStreams.containsKey(streamId);
}

/**
* @return Registered siddhi extensions.
/**
* @return Registered siddhi extensions.
*/
public Map<String, Class<?>> getExtensions() {
return this.extensions;
}

/**
* Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
* @param streamId Siddhi streamId to check.
* @throws UndefinedStreamException throws if given streamId is not defined
public Map<String, Class<?>> getExtensions() {
return this.extensions;
}

/**
* Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
* @param streamId Siddhi streamId to check.
* @throws UndefinedStreamException throws if given streamId is not defined
*/
public void checkStreamDefined(String streamId) throws UndefinedStreamException {
Preconditions.checkNotNull(streamId,"streamId");
if (!isStreamDefined(streamId)) {
throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
}
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
* and select as initial source stream to connect to siddhi operator.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
*/
public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
return environment.from(streamId, dataStream, fieldNames);
}

/**
* Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
* and select the registered stream as initial stream to connect to Siddhi Runtime.
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
public void checkStreamDefined(String streamId) throws UndefinedStreamException {
Preconditions.checkNotNull(streamId,"streamId");
if (!isStreamDefined(streamId)) {
throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
}
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
* and select as initial source stream to connect to siddhi operator.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
*/
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
this.registerStream(streamId, dataStream, fieldNames);
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select stream by streamId as initial stream to connect to Siddhi Runtime.
*
* @param streamId Siddhi Stream Name
* @param <T> Stream Generic Type
public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
return environment.from(streamId, dataStream, fieldNames);
}

/**
* Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
* and select the registered stream as initial stream to connect to Siddhi Runtime.
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
*/
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
*
* @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
* @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
*
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
this.registerStream(streamId, dataStream, fieldNames);
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select stream by streamId as initial stream to connect to Siddhi Runtime.
*
* @param streamId Siddhi Stream Name
* @param <T> Stream Generic Type
*/
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
*
* @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
* @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
*
* @return The UnionSiddhiStream Builder
*/
public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
Preconditions.checkNotNull(firstStreamId,"firstStreamId");
Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
Preconditions.checkNotNull(firstStreamId,"firstStreamId");
Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*/
public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
if (isStreamDefined(streamId)) {
throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
}
dataStreams.put(streamId, dataStream);
SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
dataStreamSchemas.put(streamId, schema);
}

/**
* @return Current StreamExecutionEnvironment.
public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
if (isStreamDefined(streamId)) {
throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
}
dataStreams.put(streamId, dataStream);
SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
dataStreamSchemas.put(streamId, schema);
}

/**
* @return Current StreamExecutionEnvironment.
*/
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}

/**
* Register Siddhi CEP Extensions
*
* @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
* @param extensionName Unique siddhi extension name
* @param extensionClass Siddhi Extension class
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}

/**
* Register Siddhi CEP Extensions
*
* @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
* @param extensionName Unique siddhi extension name
* @param extensionClass Siddhi Extension class
*/
public void registerExtension(String extensionName, Class<?> extensionClass) {
if (extensions.containsKey(extensionName)) {
throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
}
extensions.put(extensionName, extensionClass);
}

/**
* Get registered source DataStream with Siddhi streamId.
*
* @param streamId Siddhi streamId
public void registerExtension(String extensionName, Class<?> extensionClass) {
if (extensions.containsKey(extensionName)) {
throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
}
extensions.put(extensionName, extensionClass);
}

/**
* Get registered source DataStream with Siddhi streamId.
*
* @param streamId Siddhi streamId
* @return The source DataStream registered with Siddhi streamId
*/
public <T> DataStream<T> getDataStream(String streamId) {
if (this.dataStreams.containsKey(streamId)) {
return (DataStream<T>) this.dataStreams.get(streamId);
} else {
throw new UndefinedStreamException("Undefined stream " + streamId);
}
}

/**
* Create new SiddhiCEP instance.
*
* @param streamExecutionEnvironment StreamExecutionEnvironment
* @return New SiddhiCEP instance.
public <T> DataStream<T> getDataStream(String streamId) {
if (this.dataStreams.containsKey(streamId)) {
return (DataStream<T>) this.dataStreams.get(streamId);
} else {
throw new UndefinedStreamException("Undefined stream " + streamId);
}
}

/**
* Create new SiddhiCEP instance.
*
* @param streamExecutionEnvironment StreamExecutionEnvironment
* @return New SiddhiCEP instance.
*/
public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
return new SiddhiCEP(streamExecutionEnvironment);
}
public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
return new SiddhiCEP(streamExecutionEnvironment);
}
}
Loading

0 comments on commit c15eb8a

Please sign in to comment.