-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbtools.py
68 lines (47 loc) · 2.24 KB
/
dbtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import inspect
def orm(cursor, dto_type):
# the following line retrieve the argument names of the constructor
args : list[str] = list(inspect.signature(dto_type.__init__).parameters.keys())
# the first argument of the constructor will be 'self', it does not correspond
# to any database field, so we can ignore it.
args : list[str] = args[1:]
# gets the names of the columns returned in the cursor
col_names = [column[0] for column in cursor.description]
# map them into the position of the corresponding constructor argument
col_mapping = [col_names.index(arg) for arg in args]
return [row_map(row, col_mapping, dto_type) for row in cursor.fetchall()]
def row_map(row, col_mapping, dto_type):
ctor_args = [row[idx] for idx in col_mapping]
return dto_type(*ctor_args)
class Dao(object):
def __init__(self, dto_type, conn):
self._conn = conn
self._dto_type = dto_type
# dto_type is a class, its __name__ field contains a string representing the name of the class.
self._table_name = dto_type.__name__.lower() + 's'
def insert(self, dto_instance):
ins_dict = vars(dto_instance)
column_names = ','.join(ins_dict.keys())
params = list(ins_dict.values())
qmarks = ','.join(['?'] * len(ins_dict))
stmt = 'INSERT INTO {} ({}) VALUES ({})' \
.format(self._table_name, column_names, qmarks)
self._conn.execute(stmt, params)
def find_all(self):
c = self._conn.cursor()
c.execute('SELECT * FROM {}'.format(self._table_name))
return orm(c, self._dto_type)
def find(self, **keyvals):
column_names = keyvals.keys()
params = list(keyvals.values())
stmt = 'SELECT * FROM {} WHERE {}' \
.format(self._table_name, ' AND '.join([col + '=?' for col in column_names]))
c = self._conn.cursor()
c.execute(stmt, params)
return orm(c, self._dto_type)
def delete(self, **keyvals):
column_names = keyvals.keys()
params = list(keyvals.values())
stmt = 'DELETE FROM {} WHERE {}' \
.format(self._table_name,' AND '.join([col + '=?' for col in column_names]))
self._conn.cursor().execute(stmt, params)