-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlambda_function.py
82 lines (61 loc) · 2.74 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class DLQ:
def __init__(self, queue_name):
self.sqs = boto3.resource('sqs')
self.dead_letter_queue_name = queue_name
def get_queue(self, queue_name):
queue = self.sqs.get_queue_by_name(
QueueName=queue_name
)
return queue
def send_messages_to_source_queue(self, retry_count, message_id, message_body):
source_queue_name = self.dead_letter_queue_name.replace('DLQ-', '')
logger.info('Source Queue: %s', source_queue_name)
to_send_retry_count = int(retry_count) + 1
self.get_queue(source_queue_name).send_messages(Entries=[
{
'Id': message_id,
'MessageBody': message_body,
'MessageAttributes': {
'retryCount': {
'StringValue': str(to_send_retry_count),
'DataType': 'String'
}
}
}
]
)
def delete_message_from_dlq(self, message_id, message_receipt_handle):
self.get_queue(self.dead_letter_queue_name).delete_messages(Entries=[
{
'Id': message_id,
'ReceiptHandle': message_receipt_handle
}
]
)
def lambda_handler(event, context):
logger.info('Event Body: \n' + str(event))
for i, msg in enumerate(event['Records']):
logger.info('Index:[%s], starting requeue...', str(i))
dlq_name = msg['eventSourceARN'].split(':')[5]
message_id = msg['messageId']
message_receipt_handle = msg['receiptHandle']
message_body = msg['body']
try:
retry_count = msg['messageAttributes']['retryCount']['stringValue']
except KeyError as e:
logger.warning('KeyError: %s, assign retry_count = 1', e)
retry_count = 1
if int(retry_count) > 3:
logger.warning('Index:[%s], Times:[%s], ID: %s. This task is retrying over 3 time. function end.', str(i), str(retry_count), str(message_id))
logger.info('Index:[%s], Deleteing message on DLQ...', str(i))
DLQ(dlq_name).delete_message_from_dlq(message_id, message_receipt_handle)
return 200
logger.info('Index:[%s], Sending message back to source queue...', str(i))
DLQ(dlq_name).send_messages_to_source_queue(retry_count, message_id, message_body)
logger.info('Index:[%s], Deleteing message on DLQ...', str(i))
DLQ(dlq_name).delete_message_from_dlq(message_id, message_receipt_handle)
return 200