This repository has been archived by the owner on Oct 6, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclass.queue.php
231 lines (199 loc) · 6.48 KB
/
class.queue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
<?php
function conectarDB(){
$dbhost = 'localhost';
$dbuser = 'root';
$dbpass = '';
$dbbase = 'dbqueue';
$link = mysqli_connect($dbhost,$dbuser,$dbpass);
mysqli_select_db($link,$dbbase);
return $link;
}
class queue_admin {
/*
Maximum processes in parallel
*/
public $max_process = 10;
/*
PHP_CLI
Example
cPanel: '/usr/bin/php'
xampp Windows: 'C:\xampp\php\php.exe'
vestacp: 'php'
*/
private $php_cli = 'php';
/*
File responsible for processing the command
*/
private $process_job_file = __DIR__ . '/queue_agent.php';
/*
Link DB
*/
private $linkdb;
function __construct (){
//
$this->linkdb = conectarDB();
mysqli_query($this->linkdb, 'CREATE TABLE IF NOT EXISTS `queue` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`command` text,
`message` text,
`priority` int(5),
`time_add_queue` int(11),
`time_start_process` int(11),
`time_finish_process` int(11),
`time_wake_up` int(11),
`status` int(11)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;');
}
private function reconnectdb(){
mysqli_close($this->linkdb);
$this->linkdb = conectarDB();
}
public function enqueue($command, $time_wake_up = 0, $priority = 10){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$time = time();
$command = mysqli_real_escape_string($this->linkdb,$command);
$sql = "INSERT INTO queue (command, message, priority, time_add_queue, time_start_process, time_finish_process, time_wake_up, status) VALUES ('{$command}', '', '{$priority}', '{$time}', 0, 0, '{$time_wake_up}', 1)";
mysqli_query($this->linkdb, $sql);
return mysqli_insert_id($this->linkdb);
}
public function run_job($id){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$id = mysqli_real_escape_string($this->linkdb,$id);
$sql = "SELECT * FROM queue WHERE id = '{$id}' and (status = '1' or status = '4') LIMIT 1";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
if(empty($result)) return false;
$this->mark_as_process($id);
$output = $this->exec_command($result['command']);
$this->set_message($id,$output);
$this->mark_as_finish($id);
return true;
}
public function count_queue_processing(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "SELECT count(*) as total FROM queue WHERE status = '2'";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
return $result['total'];
}
public function count_queue_pending(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "SELECT count(*) as total FROM queue WHERE status = '1'";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
return $result['total'];
}
public function count_queue_finish(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "SELECT count(*) as total FROM queue WHERE status = '3'";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
return $result['total'];
}
public function count_queue_total(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "SELECT count(*) as total FROM queue";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
return $result['total'];
}
public function process_queue(){
if($this->can_process()){
$this->process_next();
}
}
public function get_queue_db(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "SELECT * FROM queue ORDER BY status DESC, priority ASC, id ASC";
$sql = mysqli_query($this->linkdb, $sql);
$data = array();
while($row = mysqli_fetch_assoc($sql)){
$data[] = $row;
}
return $data;
}
public function delete_process_finish(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$sql = "DELETE FROM queue WHERE status = '3'";
$sql = mysqli_query($this->linkdb, $sql);
}
public function is_dispatcher_run(){
$f = fopen(__DIR__.'/dispatcher.lock', 'w');
if(!flock($f, LOCK_EX | LOCK_NB)) {
return true;
}
fclose($f);
return false;
}
private function process_job($id){
$this->execInBackground($this->php_cli.' '.$this->process_job_file.' '.$id);
}
private function process_next(){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$time = time();
$sql = "SELECT * FROM queue WHERE status = '1' and time_wake_up <= '{$time}' ORDER BY priority ASC, id ASC LIMIT 1";
$sql = mysqli_query($this->linkdb, $sql);
$result = mysqli_fetch_assoc($sql);
if(empty($result)) return;
$this->mark_as_pre_process($result['id']);
$this->process_job($result['id']);
}
private function can_process(){
if($this->max_process - $this->count_queue_processing() > 0) return true;
return false;
}
private function mark_as_process($id){
$this->set_status($id,2);
$this->set_start_time($id);
}
private function mark_as_pre_process($id){
$this->set_status($id,4);
}
private function mark_as_finish($id){
$this->set_status($id,3);
$this->set_finish_time($id);
}
private function set_start_time($id){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$id = mysqli_real_escape_string($this->linkdb,$id);
$time = time();
$sql = "UPDATE queue SET time_start_process = '{$time}' WHERE id = '{$id}' LIMIT 1";
mysqli_query($this->linkdb, $sql);
}
private function set_finish_time($id){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$id = mysqli_real_escape_string($this->linkdb,$id);
$time = time();
$sql = "UPDATE queue SET time_finish_process = '{$time}' WHERE id = '{$id}' LIMIT 1";
mysqli_query($this->linkdb, $sql);
}
private function set_status($id, $status){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$id = mysqli_real_escape_string($this->linkdb,$id);
$status = mysqli_real_escape_string($this->linkdb, $status);
$sql = "UPDATE queue SET status = '{$status}' WHERE id = '{$id}' LIMIT 1";
mysqli_query($this->linkdb, $sql);
}
private function set_message($id, $output){
if(!mysqli_ping($this->linkdb)) $this->reconnectdb();
$id = mysqli_real_escape_string($this->linkdb,$id);
$output = mysqli_real_escape_string($this->linkdb, $output);
$sql = "UPDATE queue SET message = '{$output}' WHERE id = '{$id}' LIMIT 1";
mysqli_query($this->linkdb, $sql);
}
private function execInBackground($cmd) {
if (substr(php_uname(), 0, 7) == "Windows"){
pclose(popen("start /B ". $cmd, "r"));
} else {
exec($cmd . " > /dev/null &");
}
}
private function exec_command($cmd){
if (substr(php_uname(), 0, 7) == "Windows"){
$output = shell_exec($cmd);
} else {
$output = shell_exec($cmd);
}
return $output;
}
}