-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwetransferdl.py
122 lines (103 loc) · 4.13 KB
/
wetransferdl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from typing import List
import logging
import os.path
import re
import urllib.parse
import zlib
import requests
import zipfile
WETRANSFER_API_URL = 'https://wetransfer.com/api/v4/transfers'
WETRANSFER_DOWNLOAD_URL = WETRANSFER_API_URL + '/{transfer_id}/download'
WETRANSFER_DEFAULT_CHUNK_SIZE = 5242880
WETRANSFER_EXPIRE_IN = 604800
logger = logging.getLogger(__name__)
def download_url(url: str) -> str:
"""Given a wetransfer.com download URL download return the downloadable URL.
The URL should be of the form `https://we.tl/' or
`https://wetransfer.com/downloads/'. If it is a short URL (i.e. `we.tl')
the redirect is followed in order to retrieve the corresponding
`wetransfer.com/downloads/' URL.
The following type of URLs are supported:
- `https://we.tl/<short_url_id>`:
received via link upload, via email to the sender and printed by
`upload` action
- `https://wetransfer.com/<transfer_id>/<security_hash>`:
directly not shared in any ways but the short URLs actually redirect to
them
- `https://wetransfer.com/<transfer_id>/<recipient_id>/<security_hash>`:
received via email by recipients when the files are shared via email
upload
Return the download URL (AKA `direct_link') as a str or None if the URL
could not be parsed.
"""
logger.debug(f'Getting download URL of {url}')
# Follow the redirect if we have a short URL
if url.startswith('https://we.tl/'):
r = requests.head(url, allow_redirects=True)
logger.debug(f'Short URL {url} redirects to {r.url}')
url = r.url
recipient_id = None
params = urllib.parse.urlparse(url).path.split('/')[2:]
if len(params) == 2:
transfer_id, security_hash = params
elif len(params) == 3:
transfer_id, recipient_id, security_hash = params
else:
return None
logger.debug(f'Getting direct_link of {url}')
j = {
"intent": "entire_transfer",
"security_hash": security_hash,
}
if recipient_id:
j["recipient_id"] = recipient_id
s = _prepare_session()
r = s.post(WETRANSFER_DOWNLOAD_URL.format(transfer_id=transfer_id),
json=j)
j = r.json()
return j.get('direct_link')
def _file_unquote(file: str) -> str:
"""Given a URL encoded file unquote it.
All occurences of `\', `/' and `../' will be ignored to avoid possible
directory traversals.
"""
return urllib.parse.unquote(file).replace('../', '').replace('/', '').replace('\\', '')
def download(url: str, directory: str, extract: bool, file: str = '') -> None:
"""Given a `we.tl/' or `wetransfer.com/downloads/' download it.
First a direct link is retrieved (via download_url()), the filename can be
provided via the optional `file' argument. If not provided the filename
will be extracted to it and it will be fetched and stored on the current
working directory.
"""
logger.debug(f'Downloading {url}')
dl_url = download_url(url)
if not file:
file = _file_unquote(urllib.parse.urlparse(dl_url).path.split('/')[-1])
temp_output = directory + file
print('---> Downloading to: ' + temp_output)
logger.debug(f'Fetching {dl_url}')
r = requests.get(dl_url, stream=True)
with open(temp_output, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
f.write(chunk)
if extract:
unzip_path = directory + os.path.splitext(file)[0]
print('--> Extracting to: ' + unzip_path)
with zipfile.ZipFile(temp_output, 'r') as zip_ref:
zip_ref.extractall(unzip_path)
zip_ref.close()
os.remove(temp_output)
def _prepare_session() -> requests.Session:
"""Prepare a wetransfer.com session.
Return a requests session that will always pass the required headers
and with cookies properly populated that can be used for wetransfer
requests.
"""
s = requests.Session()
r = s.get('https://wetransfer.com/')
m = re.search('name="csrf-token" content="([^"]+)"', r.text)
s.headers.update({
'x-csrf-token': m.group(1),
'x-requested-with': 'XMLHttpRequest',
})
return s