Skip to content

Commit

Permalink
Merge pull request #852 from emptyarrayhq/integration
Browse files Browse the repository at this point in the history
EMP-17 int gmail and x integrations
  • Loading branch information
sajdakabir authored Feb 19, 2025
2 parents da620ab + 936a9f6 commit a5d6f2c
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 1 deletion.
1 change: 1 addition & 0 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"nodemailer-sendgrid": "^1.0.3",
"path": "^0.12.7",
"twilio": "^5.3.0",
"twitter-api-v2": "^1.20.1",
"uuid": "^10.0.0",
"ws": "^8.18.0"
},
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/controllers/integration/email.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import { Object } from "../../models/lib/object.model.js";

// Webhook to handle incoming push notifications from Gmail
const handlePushNotification = async (req, res) => {
console.log("req.body: ", req.body);
const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
const parsedMessage = JSON.parse(message);
const userEmail = parsedMessage.emailAddress;
Expand Down Expand Up @@ -189,6 +190,7 @@ const handlePushNotification = async (req, res) => {
// }

const createIssueFromEmail = async (email, user) => {
console.log('Creating issue from email:', email);
const getEmailBody = (payload) => {
if (payload.parts) {
for (const part of payload.parts) {
Expand Down
95 changes: 95 additions & 0 deletions apps/backend/src/controllers/integration/x.controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { TwitterApi } from "twitter-api-v2";
import { XQueue } from "../../loaders/bullmq.loader.js";

const twitterClient = new TwitterApi({
clientId: process.env.X_CLIENT_ID,
clientSecret: process.env.X_CLIENT_SECRET
});

const CALLBACK_URL = process.env.X_CALLBACK_URL;

const tempOAuthStore = new Map();

export const redirectXOAuthLoginController = async (req, res) => {
try {
const { url, codeVerifier, state } = twitterClient.generateOAuth2AuthLink(CALLBACK_URL, {
scope: ["tweet.read", "users.read", "bookmark.read", "offline.access"]
});

tempOAuthStore.set(state, {
codeVerifier,
timestamp: Date.now()
});

console.log("Redirect URL:", url);

// Clean up expired entries
cleanupTempStore();
res.redirect(url);
} catch (error) {
console.error("Twitter OAuth Error:", error);
res.status(500).json({
message: "Error initiating Twitter OAuth",
error: error.message
});
}
};

export const getXAccessTokenController = async (req, res) => {
try {
const { state, code } = req.query;
const user = req.user;
const storedData = tempOAuthStore.get(state);

if (!storedData) {
console.error("No stored data found for state:", state);
throw new Error("Invalid or expired state parameter");
}

// Clean up stored data immediately after retrieving it
tempOAuthStore.delete(state);

const { client: loggedClient, accessToken, refreshToken } = await twitterClient.loginWithOAuth2({
code,
codeVerifier: storedData.codeVerifier,
redirectUri: CALLBACK_URL
});
user.integration.x.accessToken = accessToken;
user.integration.x.refreshToken = refreshToken;
user.integration.x.connected = true;
await user.save()

await XQueue.add(
"XQueue",
{
accessToken,
userId: user._id
}
);

res.json({
message: "Twitter authentication successful",
accessToken,
refreshToken
});
} catch (error) {
console.error("Error exchanging Twitter OAuth token:", error);
res.status(500).json({
message: "Error getting access token",
error: error.message
});
}
};

// cleanup function to remove expired entries (older than 5 minutes)
function cleanupTempStore () {
const fiveMinutes = 5 * 60 * 1000;
const now = Date.now();

for (const [state, data] of tempOAuthStore.entries()) {
if (now - data.timestamp > fiveMinutes) {
console.log("Removing expired state:", state);
tempOAuthStore.delete(state);
}
}
}
1 change: 1 addition & 0 deletions apps/backend/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { handleSmsItemCreation } from "./controllers/integration/message.control
import bodyParser from "body-parser";
import { linearWorker } from "./jobs/linear.job.js"
import { initWorker } from "./jobs/init.job.js";
import { XWorker } from "./jobs/x.job.js";

const { ValidationError } = Joi;
const app = express();
Expand Down
67 changes: 67 additions & 0 deletions apps/backend/src/jobs/x.job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { XQueue } from '../loaders/bullmq.loader.js';
import { Worker } from "bullmq";
import { redisConnection } from "../loaders/redis.loader.js";
import { syncXBookmarks } from "../services/integration/x.service.js";

const processXJob = async (job) => {
const { accessToken, userId } = job.data;
console.log(`Processing x job with ${accessToken}`);
try {
await syncXBookmarks(accessToken, userId)
} catch (error) {
console.error('Error processing X job:', error);
throw error;
}
};

/**
* Worker setup and event handling
*/
const XWorker = new Worker('XQueue', async (job) => {
console.log(`Processing job with id ${job.id}`);
await processXJob(job);
}, {
connection: redisConnection,
concurrency: 5
});

XWorker.on('active', (job) => {
console.log(`Processing job: ${job.id}`);
});

/**
* Event listener for job completion.
* Logs the completion and removes the job from the queue.
*
* @param {Object} job - The completed job object.
*/
XWorker.on('completed', async (job) => {
console.log(`Job with id ${job.id} has been completed`);
await job.remove();
});

/**
* Event listener for job failure.
* Logs the error message.
*
* @param {Object} job - The failed job object.
* @param {Error} err - The error that caused the job to fail.
*/
XWorker.on('failed', (job, err) => {
console.error(`Job with id ${job.id} failed with error: ${err.message}`);
});

/**
* Event listener for worker errors.
* Logs Redis connection errors.
*
* @param {Error} err - The error object.
*/
XWorker.on('error', (err) => {
console.error('Redis connection error in XWorker:', err);
});

export {
XQueue,
XWorker
};
7 changes: 6 additions & 1 deletion apps/backend/src/loaders/bullmq.loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ const initQueue = new Queue('initQueue', {
connection: redisConnection
});

const XQueue = new Queue('XQueue', {
connection: redisConnection
});

console.log('Queues setup completed.');

export {
linearQueue,
notionQueue,
cycleQueue,
initQueue
initQueue,
XQueue
};
5 changes: 5 additions & 0 deletions apps/backend/src/models/core/user.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ const UserSchema = new Schema({
userName: String,
connected: { type: Boolean, default: false }
},
x: {
accessToken: String,
refreshToken: String,
connected: { type: Boolean, default: false }
},
notion: {
accessToken: String,
userId: String,
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/routers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import CalenderRoute from "../routers/integration/calendar.route.js";
import EmailRoute from "../routers/integration/email.route.js";
import GithubRoute from "../routers/integration/github.route.js";
import NotionRoute from "../routers/integration/notion.route.js";
import XRoute from "../routers/integration/x.route.js";
import AiRoute from "../routers/ai/ai.route.js";

/**
Expand All @@ -24,6 +25,7 @@ const initRoutes = (app) => {
app.use('/gmail', JWTMiddleware, EmailRoute);
app.use('/github', JWTMiddleware, GithubRoute);
app.use('/notion', JWTMiddleware, NotionRoute);
app.use('/x', JWTMiddleware, XRoute);
app.use('/ai', JWTMiddleware, AiRoute);
app.get("/", async (req, res) => {
res.json({
Expand Down
9 changes: 9 additions & 0 deletions apps/backend/src/routers/integration/x.route.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Router } from "express";
import { redirectXOAuthLoginController, getXAccessTokenController } from "../../controllers/integration/x.controller.js";

const router = Router();

router.route("/connect/").get(redirectXOAuthLoginController);
router.route("/getAccessToken/").get(getXAccessTokenController);

export default router;
Loading

0 comments on commit a5d6f2c

Please sign in to comment.