forked from All-Hands-AI/OpenHands
-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsandbox_checker.py
98 lines (88 loc) · 2.42 KB
/
sandbox_checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import json
from time import sleep
import requests
import os
sandbox_port = os.environ.get('SANDBOX_PORT', 39180)
def run_ipython(code):
return {
'action': {
'action': 'run_ipython',
'args': {
'code': code,
},
}
}
def run(command):
if command.startswith('!') or command.startswith('%'):
return run_ipython(command)
return {
'action': {
'action': 'run',
'args': {
'command': command,
},
'timeout': 1,
}
}
def read_file(path):
return {
'action': {
'action': 'read',
'args': {'path': path},
}
}
def write_file(path, content):
return {
'action': {
'action': 'write',
'args': {'path': path, 'content': content},
}
}
def file_edit(**kwargs):
return {
'action': {
'action': 'edit',
'args': kwargs,
}
}
def execute_action(data, timeout=5):
data = json.dumps(data)
for k in [1, 2]:
try:
url = f'http://localhost:{sandbox_port}/execute_action'
response = requests.post(url, data=data, timeout=timeout)
rj = response.json()
print(rj['content'].replace('\r', ''))
if rj.get('suffix'):
print(rj['suffix'])
break
except requests.exceptions.Timeout:
if k == 2:
print('Timeout')
# sleep(1)
pass
except Exception as e:
print(e)
print(response.json())
print(response.status_code)
if __name__ == '__main__':
# execute_action(run_ipython('%pwd'))
# execute_action(run_ipython('!which python'))
if 0:
if 0:
execute_action(
run_ipython(
'import IPython\nIPython.Application.instance().kernel.do_shutdown(True)'
)
)
sleep(3)
execute_action(
run_ipython(
'from openhands.runtime.plugins.agent_skills.agentskills import *'
)
)
# execute_action(run_ipython('open_file'))
# execute_action(run('pwd'))
# execute_action(read_file('test2'))
execute_action(file_edit(path='/workspace/test10.txt', command='create', file_text='hello'))
# execute_action(write_file('test6.txt', 'hello'))