-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlite_manager.py
executable file
·131 lines (104 loc) · 4.15 KB
/
sqlite_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import sqlite3
class ManageDb:
def __init__(self, db_name: str = "test"):
self.db_name = db_name + ".db"
@staticmethod
def init_name(name):
if isinstance(name, str):
return name.replace("'", "").replace('"', "")
else:
return name
def create_table(self, table: dict):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, val in table.items():
coul = [f"{name} {v}" for name, v in val.items()]
cursor.execute("CREATE TABLE IF NOT EXISTS {0} ({1})".format(key, ", ".join(coul)))
db.commit()
def select(self, column: str = "*", table: str = "sqlite_master",
where: str = None, distinct: bool = False, order_by: str = None,
limit: int = None):
distinct_ = "DISTINCT " if distinct else ''
order_by_ = f'ORDER BY {order_by}' if order_by else ''
limit_ = f'LIMIT {limit}' if limit else ''
where_ = f'WHERE {where}' if where else ''
sql = f"SELECT {distinct_}{column} FROM {table} {where_} {order_by_} {limit_}"
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(sql)
db_values = cursor.fetchall()
return db_values
def insert(self, table: str, rows: dict):
column = ', '.join(rows.keys())
values = [f"'{self.init_name(val)}'" for val in rows.values()]
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(f'INSERT INTO {table} ({column}) VALUES ({", ".join(values)})')
db.commit()
return cursor.lastrowid
def delete(self, table: dict):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
cursor.execute(f"DELETE FROM {key} WHERE {value[0]}='{value[1]}'")
db.commit()
def advanced_delete(self, table):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
where = ''
for arg in value:
key_ = arg[0]
val_ = arg[1] if type(arg[1]) is int else f'"{arg[1]}"'
where += f'{key_} = {val_} AND '
cursor.execute(f"DELETE FROM {key} WHERE {where[:-4]}")
db.commit()
def drop_table(self, table: str):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {table}")
db.commit()
def update(self, table, where):
where = f'where {where}' or None
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
for k, v in value.items():
text = f"UPDATE {key} SET {k} = '{self.init_name(v)}' {where}"
cursor.execute(text)
db.commit()
return cursor.lastrowid
def custom(self, order: str, return_fetcall=True):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(order)
db.commit()
if return_fetcall:
return cursor.fetchall()
def custom_multi(self, *order):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for order_ in order:
cursor.execute(order_)
db.commit()
# t = {
# "student": {
# "id": "integer primary key",
# "name": "TEXT",
# "family": "TEXT",
# "age": "INTEGER"
# },
# "teacher": {
# "name": "TEXT",
# "family": "TEXT",
# "age": "INTEGER"
# }
# }
# a = ManageDb()
# a.create_table(t)
# print(a.custom("SELECT name from sqlite_master where type='table'"))
# a.insert(table='student', rows=[{'name': 'amir', 'family': 'najafi', 'age': 21}, {'name': 'fsd', 'family': 'sfd', 'age': 34}])
# a.delete({'student': ['name', 'amir']})
# a.drop_table('teacher')
# print(a.order_by(table='student'))
# print(a.select(table='student'))