-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement a DelegateHandler as a new module fixes #59
- Loading branch information
Showing
20 changed files
with
1,302 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
dependencies { | ||
compile project(':gateleen-core') | ||
compile project(':gateleen-validation') | ||
compile project(':gateleen-queue') | ||
compile "org.swisspush:redisques:$redisquesVersion" | ||
compile "io.vertx:vertx-core:$vertxVersion" | ||
compile "io.vertx:vertx-redis-client:$vertxVersion" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>gateleen</artifactId> | ||
<groupId>org.swisspush.gateleen</groupId> | ||
<version>1.0.16-SNAPSHOT</version> | ||
<relativePath>..</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>gateleen-delegate</artifactId> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.swisspush.gateleen</groupId> | ||
<artifactId>gateleen-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.swisspush.gateleen</groupId> | ||
<artifactId>gateleen-validation</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.swisspush.gateleen</groupId> | ||
<artifactId>gateleen-queue</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.swisspush</groupId> | ||
<artifactId>redisques</artifactId> | ||
<version>${redisques.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.vertx</groupId> | ||
<artifactId>vertx-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.vertx</groupId> | ||
<artifactId>vertx-redis-client</artifactId> | ||
</dependency> | ||
</dependencies> | ||
</project> |
208 changes: 208 additions & 0 deletions
208
gateleen-delegate/src/main/java/org/swisspush/gateleen/delegate/Delegate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
package org.swisspush.gateleen.delegate; | ||
|
||
import io.vertx.core.Handler; | ||
import io.vertx.core.MultiMap; | ||
import io.vertx.core.buffer.Buffer; | ||
import io.vertx.core.http.*; | ||
import io.vertx.core.json.JsonArray; | ||
import io.vertx.core.json.JsonObject; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.swisspush.gateleen.core.util.StatusCode; | ||
import org.swisspush.gateleen.monitoring.MonitoringHandler; | ||
|
||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Represents a Delegate. | ||
* | ||
* @author /~https://github.com/ljucam [Mario Ljuca] | ||
*/ | ||
public class Delegate { | ||
private static final Logger LOG = LoggerFactory.getLogger(Delegate.class); | ||
private static final String HEADERS = "headers"; | ||
private static final String PAYLOAD = "payload"; | ||
private static final String URI = "uri"; | ||
private static final String METHOD = "method"; | ||
private static final int FIRST = 0; | ||
|
||
private final String name; | ||
private final HttpClient selfClient; | ||
private final MonitoringHandler monitoringHandler; | ||
private final Pattern pattern; | ||
private final Set<HttpMethod> methods; | ||
private final List<JsonObject> requests; | ||
|
||
/** | ||
* Creates a new instance of a Delegate. | ||
* @param monitoringHandler monitoringHandler | ||
* @param selfClient selfClient | ||
* @param name name of delegate | ||
* @param pattern pattern for the delegate | ||
* @param methods methods of the delegate | ||
* @param requests requests of the delegate | ||
*/ | ||
public Delegate(final MonitoringHandler monitoringHandler, final HttpClient selfClient, final String name, final Pattern pattern, final Set<HttpMethod> methods, final List<JsonObject> requests) { | ||
this.monitoringHandler = monitoringHandler; | ||
this.selfClient = selfClient; | ||
this.name = name; | ||
this.pattern = pattern; | ||
this.methods = methods; | ||
this.requests = requests; | ||
} | ||
|
||
/** | ||
* Returns the name of the delegate. | ||
* | ||
* @return name | ||
*/ | ||
public String getName() { | ||
return name; | ||
} | ||
|
||
/** | ||
* Handles the given request. | ||
* | ||
* @param request original request | ||
*/ | ||
public void handle(final HttpServerRequest request) { | ||
|
||
|
||
// is method handled? | ||
if ( methods.contains(request.method())) { | ||
final Handler<HttpClientResponse> handler = installDoneHandler(request); | ||
final JsonObject firstRequest = requests.get(FIRST); | ||
createRequest(request.uri(), firstRequest, handler); | ||
return; | ||
} | ||
|
||
// end response, if nothing matches | ||
request.response().end(); | ||
} | ||
|
||
/** | ||
* Prepares and fires a delegate request. <br> | ||
* The preparation includes the replacement of all groups | ||
* matching the given delegate pattern. Also the | ||
* request uri is adapted. | ||
* | ||
* @param uri original request | ||
* @param requestObject the delegate request object | ||
* @param doneHandler the done handler called as soon as the request is executed | ||
*/ | ||
private void createRequest(final String uri, final JsonObject requestObject, final Handler<HttpClientResponse> doneHandler) { | ||
// matcher to replace wildcards with matching groups | ||
final Matcher matcher = pattern.matcher(uri); | ||
|
||
// adapt the request uri if necessary | ||
final String requestUri = matcher.replaceAll(requestObject.getString(URI)); | ||
|
||
// get the string represantion of the payload object | ||
String payloadStr; | ||
try { | ||
payloadStr = requestObject.getString(PAYLOAD); | ||
} catch(ClassCastException e) { | ||
payloadStr = requestObject.getJsonObject(PAYLOAD).encode(); | ||
} | ||
|
||
// replacement of matching groups | ||
if(payloadStr != null) { | ||
payloadStr = matcher.replaceAll(payloadStr); | ||
} | ||
|
||
// headers of the delegate | ||
MultiMap headers = new CaseInsensitiveHeaders(); | ||
JsonArray headersArray = requestObject.getJsonArray(HEADERS); | ||
if ( headersArray != null ) { | ||
if (LOG.isTraceEnabled()) { | ||
LOG.trace("Request headers:"); | ||
} | ||
|
||
headersArray.forEach(header -> { | ||
if (LOG.isTraceEnabled()) { | ||
LOG.trace(" > Key [{}], Value [{}]", ((JsonArray) header).getString(0), ((JsonArray) header).getString(1) ); | ||
} | ||
|
||
headers.add(((JsonArray) header).getString(0),((JsonArray) header).getString(1)); | ||
|
||
}); | ||
} | ||
|
||
HttpClientRequest delegateRequest = selfClient.request(HttpMethod.valueOf(requestObject.getString(METHOD)), requestUri, doneHandler); | ||
delegateRequest.headers().setAll(headers); | ||
delegateRequest.exceptionHandler(exception -> LOG.warn("Delegate request {} failed: {}",requestUri , exception.getMessage())); | ||
delegateRequest.setTimeout(120000); // avoids blocking other requests | ||
delegateRequest.end(Buffer.buffer(payloadStr)); | ||
} | ||
|
||
/** | ||
* Creates a new DoneHandler for the given request. | ||
* | ||
* @param request the original request | ||
* @return a doneHandler | ||
*/ | ||
private Handler<HttpClientResponse> installDoneHandler(final HttpServerRequest request) { | ||
return new Handler<HttpClientResponse>() { | ||
private AtomicInteger currentIndex = new AtomicInteger(0); | ||
|
||
@Override | ||
public void handle(HttpClientResponse response) { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("Done handler - handle"); | ||
} | ||
|
||
// request was fine | ||
if ( response.statusCode() == StatusCode.OK.getStatusCode() || | ||
response.statusCode() == StatusCode.ACCEPTED.getStatusCode() ) { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("Done handler - OK"); | ||
} | ||
|
||
// is there another request? | ||
if ( currentIndex.incrementAndGet() < requests.size() ) { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("Done handler - calling next {}", currentIndex.get()); | ||
} | ||
|
||
final JsonObject delegateRequest = requests.get(currentIndex.get()); | ||
createRequest(request.uri(), delegateRequest, this ); | ||
} | ||
// if not, send corresponding respond | ||
else { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("Done handler - all requests done, create response"); | ||
} | ||
createResponse(request, response); | ||
} | ||
} | ||
// request failed | ||
else { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("Done handler - not 200/202, create response [{}]", response.statusCode()); | ||
} | ||
createResponse(request, response); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Create a response. | ||
* | ||
* @param request original request | ||
* @param response a response | ||
*/ | ||
private void createResponse(final HttpServerRequest request, final HttpClientResponse response) { | ||
request.response().setStatusCode(response.statusCode()); | ||
request.response().setStatusMessage(response.statusMessage()); | ||
request.response().setChunked(true); | ||
request.response().headers().addAll(response.headers()); | ||
request.response().headers().remove("Content-Length"); | ||
response.handler(data -> request.response().write(data)); | ||
response.endHandler(v -> request.response().end()); | ||
} | ||
} |
114 changes: 114 additions & 0 deletions
114
gateleen-delegate/src/main/java/org/swisspush/gateleen/delegate/DelegateFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package org.swisspush.gateleen.delegate; | ||
|
||
import io.vertx.core.buffer.Buffer; | ||
import io.vertx.core.http.HttpClient; | ||
import io.vertx.core.http.HttpMethod; | ||
import io.vertx.core.json.JsonObject; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.swisspush.gateleen.core.util.StringUtils; | ||
import org.swisspush.gateleen.monitoring.MonitoringHandler; | ||
import org.swisspush.gateleen.validation.ValidationException; | ||
import org.swisspush.gateleen.validation.ValidationResult; | ||
import org.swisspush.gateleen.validation.Validator; | ||
|
||
import java.util.*; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* DelegateFactory is used to create delegate objects from their text representation. | ||
* | ||
* @author /~https://github.com/ljucam [Mario Ljuca] | ||
*/ | ||
public class DelegateFactory { | ||
private static final Logger LOG = LoggerFactory.getLogger(DelegateFactory.class); | ||
|
||
private static final String REQUESTS = "requests"; | ||
private static final String METHODS = "methods"; | ||
private static final String PATTERN = "pattern"; | ||
|
||
private final MonitoringHandler monitoringHandler; | ||
private final HttpClient selfClient; | ||
private final Map<String, Object> properties; | ||
private final String delegatesSchema; | ||
|
||
/** | ||
* Creates a new instance of the DelegateFactory. | ||
* | ||
* @param monitoringHandler | ||
* @param selfClient | ||
* @param properties | ||
* @param delegatesSchema | ||
*/ | ||
public DelegateFactory(final MonitoringHandler monitoringHandler, final HttpClient selfClient, final Map<String, Object> properties, final String delegatesSchema) { | ||
this.monitoringHandler = monitoringHandler; | ||
this.selfClient = selfClient; | ||
this.properties = properties; | ||
this.delegatesSchema = delegatesSchema; | ||
} | ||
|
||
/** | ||
* Tries to create a Delegate object out of the | ||
* buffer. | ||
* | ||
* | ||
* @param delegateName name of the delegate | ||
* @param buffer buffer of the delegate | ||
* @return a Delegate object | ||
*/ | ||
public Delegate parseDelegate(final String delegateName, final Buffer buffer) throws ValidationException { | ||
final String configString; | ||
|
||
// replace wildcard configs | ||
try { | ||
configString = StringUtils.replaceWildcardConfigs(buffer.toString("UTF-8"), properties); | ||
} catch(Exception e){ | ||
throw new ValidationException(e); | ||
} | ||
|
||
// validate json | ||
ValidationResult validationResult = Validator.validateStatic(Buffer.buffer(configString), delegatesSchema, LOG); | ||
if(!validationResult.isSuccess()){ | ||
throw new ValidationException(validationResult); | ||
} | ||
|
||
// everything is fine, create Delegate | ||
return createDelegate(delegateName, configString); | ||
} | ||
|
||
/** | ||
* Create the delegate out of the prepared string. | ||
* | ||
* @param delegateName name of the delegate | ||
* @param configString the string rep. of the delegate | ||
* @return the new delegate | ||
* @throws ValidationException | ||
*/ | ||
private Delegate createDelegate(final String delegateName, final String configString) throws ValidationException { | ||
JsonObject delegateObject = new JsonObject(configString); | ||
|
||
// methods of the delegate | ||
Set<HttpMethod> methods = new HashSet<>(); | ||
delegateObject.getJsonArray(METHODS).forEach( method -> methods.add(HttpMethod.valueOf( (String) method)) ); | ||
|
||
// pattern of the delegate | ||
Pattern pattern; | ||
try { | ||
pattern = Pattern.compile(delegateObject.getString(PATTERN)); | ||
} catch(Exception e) { | ||
throw new ValidationException("Could not parse pattern [" + delegateObject.getString(PATTERN)+ "] of delegate " + delegateName, e); | ||
} | ||
|
||
// requests of the delegate | ||
List<JsonObject> requests = new ArrayList<>(); | ||
for(int i = 0; i< delegateObject.getJsonArray(REQUESTS).size(); i++) { | ||
if ( LOG.isTraceEnabled() ) { | ||
LOG.trace("request of [{}] #: {}", delegateName, i ); | ||
} | ||
|
||
requests.add((JsonObject) delegateObject.getJsonArray(REQUESTS).getValue(i)); | ||
} | ||
|
||
return new Delegate(monitoringHandler, selfClient, delegateName, pattern, methods, requests ); | ||
} | ||
} |
Oops, something went wrong.