-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcryptops.py
287 lines (233 loc) · 12.5 KB
/
cryptops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# Program for encryption & decryption cryptographic operations using a nCipher nShield HSM.
# Supports AES, 3DES, and RSA algorithms.
# Tested and validated on nCipher nShield HSM 5c.
# Developed by Armando Montero.
import argparse
import logging
import pkcs11
from pkcs11 import MGF, Mechanism, Attribute, ObjectClass
from time import sleep
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(program_name := "cryptops")
# Class for managing values
class KeyManager:
def __init__(self, lib_path, token_label, pin, key_label,input_path, output_path, algorithm, mechanism, iv):
self.lib_path = lib_path
self.token_label = token_label
self.pin = pin
self.key_label = key_label
self.input_path = input_path
self.output_path = output_path
self.algorithm = algorithm
self.mechanism = mechanism
self.iv = iv
self.session = self.open_session()
@staticmethod
def args():
parser = argparse.ArgumentParser(description="Encrypt or decrypt data using a nCipher nShield HSM",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
epilog="Developed by Armando Montero",
prog="cryptops.py",
usage="python3 cryptops.py -e <encrypt>-l <lib_path> -t <token_label> -p <pin> -k <key_label> -i <input_path> -o <output_path> -a <algorithm> -iv <iv> ",
prefix_chars="-",
add_help=True,
allow_abbrev=True)
parser.add_argument("-l", "--lib_path", type=str, help="Path to the PKCS#11 library",
default="/opt/nfast/toolkits/pkcs11/libcknfast.so")
parser.add_argument("-t", "--token_label", type=str, help="Label of the token",
default="loadshared accelerator")
parser.add_argument("-p", "--pin", type=str, help="PIN of the token", default="123456")
parser.add_argument("-k", "--key_label", type=str, help="Label of the key")
parser.add_argument("-i", "--input_path", type=str, help="Path to the input file")
parser.add_argument("-o", "--output_path", type=str, help="Path to the output file")
parser.add_argument("-a", "--algorithm", type=str, help="Algorithm to use", default="AES")
parser.add_argument("-iv", "--iv", type=int, help="Initialization vector", default = 128)
parser.add_argument("-e", "--encrypt", action="store_true", help="Encrypt the data")
parser.add_argument("-d", "--decrypt", action="store_true", help="Decrypt the data")
args = parser.parse_args()
return args
# Session management.
# Open a session
def open_session(self):
lib = pkcs11.lib(self.lib_path)
token = lib.get_token(token_label=self.token_label)
session = token.open(rw=True, user_pin=self.pin)
return session
# Close a session
def close_session(self):
self.session.close()
# Encrypt data
def encrypt(self, args):
# Load the PKCS#11 library
lib = pkcs11.lib(self.lib_path)
logger.info(f"PKCS#11 library loaded successfully")
# Load the HSM token
token = lib.get_token(token_label=self.token_label)
logger.info(f"Token label: {self.token_label} loaded successfully")
sleep(1)
logger.info(f"Opening session with token: {self.token_label}")
sleep(1)
# Open a session
session = self.session = token.open(rw=True, user_pin=self.pin)
logger.info(f"Session opened successfully")
sleep(1)
# Find the key
if args.algorithm in ["AES", "3DES"]:
key = session.get_key(label=self.key_label)
logger.info(f"Secret key label: {self.key_label} found in token label: {self.token_label}")
sleep(3)
if args.algorithm == "RSA":
# Adjusted for RSA to correctly use the mechanism for public key retrieval
key_iter = session.get_objects({Attribute.CLASS: ObjectClass.PUBLIC_KEY, Attribute.LABEL: self.key_label})
key = list(key_iter) # Convert the SearchIter object to a list
logger.info(f'Public key label: {self.key_label} found in token label: {self.token_label}')
sleep(3)
# Generate a random IV
iv = session.generate_random(args.iv)
# Buffer size for chunked reading/writing
buffer_size = 8192
# Open input and output files with context managers
logger.info(f"Opening input file: {self.input_path} to encrypt...")
with open(self.input_path, "rb") as input_file, open(self.output_path, "wb") as output_file:
# Read, encrypt, and write in chunks for stream ciphers like AES and 3DES
while True:
chunk = input_file.read(buffer_size)
if not chunk:
break
if args.algorithm == "AES":
encrypted = key.encrypt(chunk, mechanism= Mechanism.AES_CBC_PAD, mechanism_param=iv)
logger.info("Encrypting data....")
sleep(8)
# Write the encrypted data to the output file
output_file.write(encrypted)
logger.info(f"Data encrypted using key label: {self.key_label} from token label: {self.token_label} successfully saved to {self.output_path}")
sleep(1)
if args.algorithm == "3DES":
# Encrypt the data using 3DES
encrypted = key.encrypt(chunk, mechanism= Mechanism.DES3_CBC_PAD, mechanism_param=iv)
logger.info("Encrypting data....")
sleep(8)
# Write the encrypted data to the output file
output_file.write(encrypted)
logger.info(f"Data encrypted using key label: {self.key_label} from token label: {self.token_label} successfully saved to {self.output_path}")
sleep(1)
if args.algorithm == "RSA":
# For RSA, typically the whole data is encrypted at once due to block size limits
data = input_file.read()
encrypted = key[0].encrypt(data, mechanism=Mechanism.RSA_PKCS_OAEP, mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None))
logger.info("Encrypting data....")
sleep(8)
# Write the encrypted data to the output file
output_file.write(encrypted)
logger.info(f"Data encrypted using key label: {self.key_label} from token label: {self.token_label} successfully saved to {self.output_path}")
sleep(1)
# Close the session
self.close_session()
logger.info(f"Session successfully closed!")
def decrypt(self, args):
# Load the PKCS#11 library
lib = pkcs11.lib(self.lib_path)
logger.info(f"PKCS#11 library loaded successfully")
# Load the HSM token
token = lib.get_token(token_label=self.token_label)
logger.info(f"Token label: {self.token_label} loaded successfully")
sleep(1)
logger.info(f"Opening session with token: {self.token_label}")
sleep(1)
# Open a session
session = self.session = token.open(rw=True, user_pin=self.pin)
logger.info(f"Session opened successfully")
sleep(1)
# Find the key
if args.algorithm in ["AES", "3DES"]:
key = session.get_key(label=self.key_label)
logger.info(f"Secret key label: {self.key_label} found in token label: {self.token_label}")
sleep(3)
if args.algorithm == "RSA":
# Adjusted for RSA to correctly use the mechanism for private key retrieval
key_iter = session.get_objects({Attribute.CLASS: ObjectClass.PRIVATE_KEY, Attribute.LABEL: self.key_label})
key = list(key_iter) # Convert the SearchIter object to a list
logger.info(f'Public key label: {self.key_label} found in token label: {self.token_label}')
sleep(3)
# Read the IV
if args.algorithm == "AES":
iv = open(self.input_path, "rb").read(16) # 16 bytes for AES IV due to 128 bit block size
logger.info(f"AES algorithm detected. Reading IV from {self.input_path}...")
sleep(3)
if args.algorithm == "3DES": # 8 bytes for 3DES IV due to 64 bit block size
iv = open(self.input_path, "rb").read(8)
logger.info(f"3DES algorithm detected. Reading IV from {self.input_path}...")
sleep(3)
# Buffer size for chunked reading/writing
buffer_size = 8192
# Open input and output files with context managers
with open(self.input_path, "rb") as input_file, open(self.output_path, "wb") as output_file:
logger.info(f"Opening input file: {self.input_path} for decryption")
if args.algorithm == "AES":
# Read, decrypt, and write in chunks for stream ciphers like AES and 3DES
while True:
chunk = input_file.read(buffer_size)
if not chunk:
break
decrypted = key.decrypt(chunk, mechanism=Mechanism.AES_CBC_PAD, mechanism_param=iv)
output_file.write(decrypted)
logger.info("Decrypting data...")
sleep(8)
if args.algorithm == "3DES":
# Read, decrypt, and write in chunks for stream ciphers like AES and 3DES
while True:
chunk = input_file.read(buffer_size)
if not chunk:
break
decrypted = key.decrypt(chunk, mechanism=Mechanism.DES3_CBC_PAD, mechanism_param=iv)
output_file.write(decrypted)
logger.info("Decrypting data...")
sleep(8)
logger.info(f"Data successfully decrypted using key label: {self.key_label} from token label: {self.token_label} saved to {self.output_path}")
if args.algorithm == "RSA":
# For RSA, typically the whole data is encrypted at once due to block size limits
data = input_file.read()
decrypted = key[0].decrypt(data, mechanism=Mechanism.RSA_PKCS_OAEP, mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None))
output_file.write(decrypted)
logger.info(f"Data decrypted using key label: {self.key_label} from token label: {self.token_label} successfully saved to {self.output_path}")
sleep(1)
# Close the session
self.close_session()
logger.info(f"Session successfully closed!")
# Main function
def main():
args = KeyManager.args() # Parse the arguments
km = KeyManager(lib_path=args.lib_path, token_label=args.token_label, pin=args.pin, key_label=args.key_label, input_path=args.input_path, output_path=args.output_path, algorithm=args.algorithm, mechanism=args.mechanism, iv=args.iv)
if args.encrypt:
km.encrypt(args)
if args.decrypt:
km.decrypt(args)
# Call to main
if __name__ == "__main__":
try:
main()
# Handle exceptions
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
except pkcs11.exceptions.FunctionFailed:
logger.error("Function failed. Check the input parameters.")
except pkcs11.exceptions.MechanismInvalid:
logger.error("Invalid mechanism. Check the input parameters.")
except pkcs11.exceptions.MechanismParamInvalid:
logger.error("Invalid mechanism parameter. Check the input parameters.")
except pkcs11.exceptions.ObjectHandleInvalid:
logger.error("Object not found. Check the input parameters.")
except pkcs11.exceptions.PinExpired:
logger.error("PIN has expired. Change the pin or check the input parameters.")
except pkcs11.exceptions.PinInvalid:
logger.error("Invalid PIN.")
except pkcs11.exceptions.TokenNotPresent:
logger.error("Token not present. Check the input parameters.")
except pkcs11.exceptions.UserAlreadyLoggedIn:
logger.error("User already logged in. Reset the token/ session.")
except pkcs11.exceptions.UserNotLoggedIn:
logger.error("User not logged in.")
except pkcs11.exceptions.SessionHandleInvalid:
logger.error("Session handle invalid.")
exit(1)