Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aomegax committed Aug 10, 2023
1 parent 4ebebfb commit 967b6f0
Showing 1 changed file with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@
import java.util.zip.DeflaterOutputStream;

/**
* Azure Functions with Azure Queue trigger.
* Azure Functions with Azure Event Hub trigger.
*/
public class NodoReEventToDataStore {
/**
* This function will be invoked when an Event Hub trigger occurs
*/

private Pattern replaceDashPattern = Pattern.compile("-([a-zA-Z])");
private static String NA = "NA";
private static String idField = "uniqueId";
private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME");
// private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME");
private static String insertedTimestamp = "insertedTimestamp";
private static String insertedDate = "insertedDate";
private static String partitionKey = "PartitionKey";
private static String payloadField = "payload";

Expand Down Expand Up @@ -87,7 +89,7 @@ private String replaceDashWithUppercase(String input) {
return sb.toString();
}

private void zipPayload(Logger logger,Map<String,Object> reEvent){
private void zipPayload(Logger logger,Map<String,Object> reEvent) {
if(reEvent.get(payloadField)!=null){
try {
byte[] data = ((String)reEvent.get(payloadField)).getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -124,7 +126,7 @@ public void processNodoReEvent (
Logger logger = context.getLogger();

// TableClient tableClient = getTableServiceClient().getTableClient(tableName);
String msg = String.format("Persisting %d events",reEvents.size());
String msg = String.format("Persisting %d events", reEvents.size());
logger.info(msg);
try {
if (reEvents.size() == properties.length) {
Expand All @@ -134,23 +136,23 @@ public void processNodoReEvent (
for(int index=0; index< properties.length; index++) {
// logger.info("processing "+(index+1)+" of "+properties.length);
final Map<String,Object> reEvent = ObjectMapperUtils.readValue(reEvents.get(index), Map.class);
properties[index].forEach((p,v)->{
properties[index].forEach((p,v) -> {
String s = replaceDashWithUppercase(p);
reEvent.put(s,v);
reEvent.put(s, v);
});
reEvent.put("timestamp", ZonedDateTime.now().toInstant().toEpochMilli());

// String partitionKeyValue = reEvent.get(insertedTimestamp) != null ? ((String)reEvent.get(insertedTimestamp)).substring(0,13) : "NA";
reEvent.put("id", reEvent.get("uniqueId"));

String partitionKeyValue = reEvent.get(insertedTimestamp) != null ? ((String)reEvent.get(insertedTimestamp)).substring(0,10) : "NA";
String idDominio = reEvent.get("idDominio") != null ? reEvent.get("idDominio").toString() : "NA";
String idPsp = reEvent.get("psp") != null ? reEvent.get("psp").toString() : "NA";
partitionKeyValue += "-" + idDominio + "-" + idPsp;
String insertedDateValue = reEvent.get(insertedTimestamp) != null ? ((String)reEvent.get(insertedTimestamp)).substring(0, 10) : NA;
reEvent.put(insertedDate, insertedDateValue);

String idDominio = reEvent.get("idDominio") != null ? reEvent.get("idDominio").toString() : NA;
String idPsp = reEvent.get("psp") != null ? reEvent.get("psp").toString() : NA;
String partitionKeyValue = insertedDateValue + "-" + idDominio + "-" + idPsp;
reEvent.put(partitionKey, partitionKeyValue);
reEvent.put("id", reEvent.get("uniqueId"));

zipPayload(logger,reEvent);
// zipPayload(logger,reEvent);
reEvent.put(payloadField, null);

// addToBatch(logger,partitionEvents,reEvent);
eventsToPersistCosmos.add(reEvent);
Expand All @@ -165,12 +167,9 @@ public void processNodoReEvent (
// });

try {
long start = ZonedDateTime.now().toInstant().toEpochMilli();
// collection.insertMany(eventsToPersistCosmos);
documentdb.setValue(eventsToPersistCosmos);
logger.info("written " + (ZonedDateTime.now().toInstant().toEpochMilli() - start));
} catch (Throwable t){
logger.severe("Could not save on cosmos "+eventsToPersistCosmos.size()+",error:"+ t.toString());
logger.severe("Could not save on cosmos "+eventsToPersistCosmos.size()+", error:"+ t.toString());
}

logger.info("Done processing events");
Expand Down

0 comments on commit 967b6f0

Please sign in to comment.