Skip to content

Commit

Permalink
Compatiable with deprecated StreamCheckpointedOperator for state snap…
Browse files Browse the repository at this point in the history
…stho/restore
  • Loading branch information
haoch committed Oct 10, 2016
1 parent c79163f commit 0ed66c3
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -48,6 +51,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.RunnableFuture;

/**
* <h1>Siddhi Runtime Operator</h1>
Expand Down Expand Up @@ -79,7 +83,7 @@
* @param <IN> Input Element Type
* @param <OUT> Output Element Type
*/
public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;

Expand Down Expand Up @@ -263,7 +267,6 @@ public void dispose() throws Exception {

@Override
public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
super.snapshotState(out, checkpointId, timestamp);
final ObjectOutputStream oos = new ObjectOutputStream(out);

// Write siddhi snapshot
Expand All @@ -278,9 +281,13 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest
oos.flush();
}

@Override
public RunnableFuture<OperatorStateHandle> snapshotState(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
return super.snapshotState(checkpointId, timestamp, streamFactory);
}

@Override
public void restoreState(FSDataInputStream state) throws Exception {
super.restoreState(state);
final ObjectInputStream ois = new ObjectInputStream(state);

// Restore siddhi snapshot
Expand Down

0 comments on commit 0ed66c3

Please sign in to comment.