-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathD4C.py
191 lines (176 loc) · 9.21 KB
/
D4C.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import re
import aiohttp
import asyncio
import aiofiles
from pathlib import Path
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor
class MangaDownloader:
def __init__(self, manga_name: str, uppercase: bool = False, edit: bool = False):
if edit:
self.manga_name = manga_name
else:
self.manga_name = manga_name.upper() if uppercase else manga_name.title()
self.formatted_manga_name = re.sub(r'\s+', '-', self.manga_name)
self.manga_folder = Path(self.formatted_manga_name)
self.executor = ThreadPoolExecutor()
self.history_file = Path("download_history.txt")
def format_chapter_number(self, chapter_number: str) -> str:
if '.' in chapter_number:
integer_part, decimal_part = chapter_number.split('.')
formatted_chapter_number = f"{int(integer_part):04d}.{decimal_part}"
else:
formatted_chapter_number = f"{int(chapter_number):04d}"
return formatted_chapter_number
async def generate_image_url(self, chapter_number: str, png_number: int, manga_address: str) -> str:
base_url = f"https://{manga_address}/manga/{{}}/{{}}-{{:03d}}.png"
url = base_url.format(self.formatted_manga_name, chapter_number, png_number)
return url
async def download_image(self, session: aiohttp.ClientSession, url: str, path: Path) -> bool:
try:
async with session.get(url) as response:
if response.status == 200:
path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(path, 'wb') as file:
await file.write(await response.read())
logging.info(f"Downloaded: {url}")
return True
else:
logging.warning(f"Failed to download {url}: {response.status}")
return False
except aiohttp.ClientError as e:
logging.error(f"Error downloading {url}: {e}")
return False
def extract_text_from_html(self, html_content: str) -> str:
pattern = re.compile(r'vm\.CurPathName\s*=\s*"([^"]+)"')
matches = pattern.findall(html_content)
return matches[0] if matches else None
async def extract_text_from_url(self, session: aiohttp.ClientSession, chapter_number: str) -> str:
formatted_chapter_number = self.format_chapter_number(chapter_number)
url = f"https://manga4life.com/read-online/{self.formatted_manga_name}-chapter-{formatted_chapter_number}.html"
try:
async with session.get(url) as response:
if response.status == 200:
html_content = await response.text()
manga_address = self.extract_text_from_html(html_content)
if not manga_address:
logging.warning(f"Could not find 'vm.CurPathName' in the page for manga '{self.manga_name}' or chapter '{formatted_chapter_number}'. Trying alternative URL.")
url = f"https://manga4life.com/read-online/{self.formatted_manga_name}-chapter-{formatted_chapter_number}-index-2.html"
async with session.get(url) as alt_response:
if alt_response.status == 200:
html_content = await alt_response.text()
manga_address = self.extract_text_from_html(html_content)
if not manga_address:
logging.warning(f"Alternative URL also failed for manga '{self.manga_name}' and chapter '{formatted_chapter_number}'.")
else:
logging.error(f"Error accessing alternative URL: HTTP {alt_response.status}")
return manga_address
else:
logging.error(f"Error accessing {url}: HTTP {response.status}")
return None
except aiohttp.ClientError as e:
logging.error(f"Error accessing {url}: {e}")
return None
async def download_chapter_images(self, session: aiohttp.ClientSession, chapter_number: str) -> bool:
formatted_chapter_number = self.format_chapter_number(chapter_number)
manga_address = await self.extract_text_from_url(session, formatted_chapter_number)
if manga_address:
chapter_folder = self.manga_folder / f"Chapter-{formatted_chapter_number}"
png_number = 1
while True:
url = await self.generate_image_url(formatted_chapter_number, png_number, manga_address)
image_filename = f"{png_number:03d}.png"
image_path = chapter_folder / image_filename
if not await self.download_image(session, url, image_path):
break
png_number += 1
return png_number > 1
else:
return False
async def download_chapters(self, chapters_to_download: list):
conn = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=conn) as session:
tasks = []
for chapter_number in chapters_to_download:
tasks.append(self.download_chapter_images(session, chapter_number))
if len(tasks) >= 5: # Limit concurrent tasks to avoid overloading
await asyncio.gather(*tasks)
tasks = []
if tasks:
await asyncio.gather(*tasks)
await self.save_history(self.manga_name)
async def save_history(self, manga_name: str):
if not self.history_file.exists():
self.history_file.touch()
async with aiofiles.open(self.history_file, 'a+') as file:
await file.seek(0)
history = await file.read()
history_lines = history.splitlines()
if manga_name not in history_lines:
await file.write(manga_name + "\n")
logging.info(f"Saved {manga_name} to history.")
async def load_history(self):
if self.history_file.exists():
async with aiofiles.open(self.history_file, 'r') as file:
history = await file.read()
history_lines = history.splitlines()
logging.info("Download History:")
for manga in history_lines:
logging.info(manga)
else:
logging.info("No download history found.")
def parse_chapters(chapters_str):
chapters = []
for part in chapters_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
chapters.extend(range(start, end + 1))
else:
chapters.append(int(part))
return [str(chapter) for chapter in chapters]
def parse_args():
parser = argparse.ArgumentParser(description="Manga Downloader")
parser.add_argument('-d', '--download', metavar='MANGA_NAME', type=str, help="Download manga chapters")
parser.add_argument('-c', '--chapters', metavar='CHAPTERS', type=str, help="Chapters to download, separated by commas")
parser.add_argument('-H', '--history', action='store_true', help="View download history")
parser.add_argument('-U', '--uppercase', action='store_true', help="Use uppercase for the manga name")
parser.add_argument('-e', '--edit', action='store_true', help="Edit manga name directly without formatting")
return parser.parse_args()
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
args = parse_args()
if args.download and args.chapters:
manga_name = args.download
chapters_to_download = parse_chapters(args.chapters)
downloader = MangaDownloader(manga_name, uppercase=args.uppercase, edit=args.edit)
asyncio.run(downloader.download_chapters(chapters_to_download))
elif args.download:
manga_name = args.download
input_chapters = input("Enter the chapter number(s) separated by commas or ranges: ")
chapters_to_download = parse_chapters(input_chapters)
downloader = MangaDownloader(manga_name, uppercase=args.uppercase, edit=args.edit)
asyncio.run(downloader.download_chapters(chapters_to_download))
elif args.history:
downloader = MangaDownloader("dummy")
asyncio.run(downloader.load_history())
else:
downloader = None
while True:
choice = input("Enter 'd' to download manga, 'h' to view history, 'q' to quit: ").strip().lower()
if choice == 'd':
manga_name = input("Enter the manga name: ")
input_chapters = input("Enter the chapter number(s) separated by commas or ranges: ")
chapters_to_download = parse_chapters(input_chapters)
downloader = MangaDownloader(manga_name)
asyncio.run(downloader.download_chapters(chapters_to_download))
elif choice == 'h':
if downloader is None:
downloader = MangaDownloader("dummy")
asyncio.run(downloader.load_history())
elif choice == 'q':
break
else:
logging.warning("Invalid choice, please try again.")
if __name__ == "__main__":
main()