-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathetherscan.py
225 lines (185 loc) · 8.16 KB
/
etherscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import requests
from requests.adapters import HTTPAdapter, Retry
import datetime
import sqlite3
import time
from ratelimit import limits, sleep_and_retry
def filter_transactions(transactions, start_date=None, end_date=None):
if start_date is None and end_date is None:
return transactions
filtered = []
for tx in transactions:
date = tx.get_datetime().date()
if start_date is not None and date < start_date:
continue
if end_date is not None and date >= end_date:
continue
filtered.append(tx)
return filtered
class Transaction:
def __init__(self, tx_hash, block_number, timestamp, gas_price, gas_used):
self.tx_hash = tx_hash
self.block_number = block_number
self.timestamp = timestamp
self.gas_price = gas_price
self.gas_used = gas_used
def __repr__(self):
return '0x' + self.tx_hash.hex()
def get_fees(self):
return self.gas_price * self.gas_used
def get_datetime(self):
return datetime.datetime.fromtimestamp(self.timestamp)
def sum_gas_used(transactions):
return sum([tx.gas_used for tx in transactions])
def sum_fees(transactions):
return sum([tx.get_fees() for tx in transactions])
def wei_to_eth(wei):
return wei * 1e-18
def hash0x_to_bytes(hash0x):
return bytearray.fromhex(hash0x[2:])
def build_rows(transactions):
for i, tx in enumerate(transactions):
yield (
hash0x_to_bytes(tx['hash']),
int(tx['blockNumber']),
int(tx['timeStamp']),
int(tx['gasPrice']),
int(tx['gasUsed']))
def addr(address):
return f'"{address.lower()}"'
class Etherscan():
def __init__(self, apikey=None, db_file='transactions.sqlite3', read_only=False):
self.apikey = apikey
if self.apikey is None:
ready_only = True
flags = '?mode=ro' if read_only else ''
self.db = sqlite3.connect(f'file:{db_file}{flags}', uri=True)
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
session.mount('https://', HTTPAdapter(max_retries=retries))
self.session = session
def __del__(self):
self.db.close()
def execute(self, query):
return self.db.cursor().execute(query)
def list_contracts(self):
query = 'SELECT name FROM sqlite_master WHERE type="table"'
for row in self.execute(query):
yield row[0]
def insert_transactions(self, address, transactions):
query = f'INSERT OR REPLACE INTO {addr(address)} VALUES (?, ?, ?, ?, ?)'
self.db.cursor().executemany(query, build_rows(transactions))
def create_transactions_table(self, address):
return self.execute(
f'CREATE TABLE IF NOT EXISTS {addr(address)} (\
hash BLOB PRIMARY KEY, \
block_number INTEGER KEY, \
timestamp INTEGER, \
gas_price INTEGER, \
gas_used INTEGER)')
def list_transactions(self, address):
query = f'SELECT * FROM {addr(address)}'
for row in self.execute(query):
yield Transaction(*row)
def count_transactions(self, address):
query = f'SELECT COUNT(*) FROM {addr(address)}'
return self.execute(query).fetchone()[0]
def latest_transaction(self, address):
query = f'SELECT * FROM {addr(address)} ORDER BY timestamp DESC LIMIT 1'
row = self.execute(query).fetchone()
if row is not None:
return Transaction(*row)
def latest_datetime(self, address):
query = f'SELECT MAX(timestamp) FROM {addr(address)}'
timestamp = self.execute(query).fetchone()[0]
if timestamp is not None:
return datetime.datetime.fromtimestamp(timestamp)
def latest_block(self, address):
query = f'SELECT MAX(block_number) FROM {addr(address)}'
return self.execute(query).fetchone()[0]
def load_transactions(self, address, update=True, update_active=None, verbose=False, **kwargs):
if self.apikey is None:
update = False
if verbose:
print('load_transactions', address)
self.create_transactions_table(address)
if update_active is not None:
now = datetime.datetime.now()
latest = self.latest_datetime(address)
if latest is not None:
seconds_per_day = 24 * 60 * 60
days = (now - latest).total_seconds() / seconds_per_day
update = days < update_active
if verbose:
print(f'latest transaction {days:.1f} day(s) ago')
if not update:
return self.list_transactions(address)
self.fetch_transactions(address, verbose=verbose, **kwargs)
return self.list_transactions(address)
def fetch_transactions_internal(self, address, **kwargs):
return self.fetch_transactions_uncached(address, 'txlistinternal', **kwargs)
def fetch_transactions_uncached(self, address, action='txlist', endblock=None, verbose=False):
all_transactions = []
startblock = None
while True:
if verbose:
print('startblock', startblock)
# download transactions between startblock and endblock
transactions = self.fetch_transactions_in_range(address, startblock, endblock, action=action)
all_transactions.extend(transactions)
if verbose:
print(f'loaded {len(transactions)} transactions')
# save the last startblock
last_startblock = startblock
# and get a new one based on the most recent transactions
startblock = max([int(e['blockNumber']) for e in transactions])
# if we can't get a block, or haven't made progress, then quit
if startblock is None or startblock == last_startblock:
if verbose:
print('done')
break
# remove duplicates
all_transactions = list({e['hash']: e for e in all_transactions}.values())
return all_transactions
def fetch_transactions(self, address, endblock=None, verbose=False):
# start by asking for the latest block on this address
# if we do not have cached data, this will return None
startblock = self.latest_block(address)
while True:
if verbose:
print('startblock', startblock)
# download transactions between startblock and endblock
transactions = self.fetch_transactions_in_range(address, startblock, endblock)
self.insert_transactions(address, transactions)
if verbose:
print(f'loaded {len(transactions)} transactions')
# save the last startblock
last_startblock = startblock
# and get a new one based on the most recent transactions
startblock = self.latest_block(address)
# if we can't get a block, or haven't made progress, then quit
if startblock is None or startblock == last_startblock:
if verbose:
print('done')
break
self.db.commit()
@sleep_and_retry
@limits(calls=4, period=1)
def fetch_transactions_in_range(self, address, startblock, endblock, ratelimit_sleep=0.5, action='txlist'):
url = f'https://api.etherscan.io/api?module=account&apikey={self.apikey}&action={action}&address={address}'
if startblock is not None:
url += f'&startblock={startblock}'
if endblock is not None:
url += f'&endblock={endblock}'
response = self.session.get(url)
try:
result = response.json()['result']
if 'rate limit' in result:
print('hit rate limit, sleeping', ratelimit_sleep, 'seconds')
ratelimit_sleep *= 2
time.sleep(ratelimit_sleep)
return self.fetch_transactions_in_range(address, startblock, endblock, ratelimit_sleep)
return result
except:
print('error parsing transactions', url)
return []