Skip to content

Commit

Permalink
bug fix timeout scrape, add message count
Browse files Browse the repository at this point in the history
  • Loading branch information
jmargutt committed Apr 8, 2024
1 parent 2c9e1d5 commit 7cdc5a5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 42 deletions.
64 changes: 29 additions & 35 deletions sml/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,14 @@ def get_data_telegram(self):
async def _scrape_messages(self, telegram_client, telegram_channels, start_date):

await telegram_client.connect()
# Set timer to avoid scraping for too long
time_out = time.time() + 60 * 45 # = 45 min
all_messages = []

for channel in telegram_channels:
if time.time() >= time_out:
logging.info(f"time_out channel")
break
logging.info(f"getting in telegram channel {channel}")
# Set timer per channel to avoid scraping for too long
time_out = time.time() + 60 * 10 # = 10 min
channel_count = 0

logging.info(f"starting telegram channel {channel} at {time.time()}")
try:
channel_entity = await telegram_client.get_entity(channel)
# scrape posts
Expand All @@ -259,43 +258,38 @@ async def _scrape_messages(self, telegram_client, telegram_channels, start_date)
reverse=True,
wait_time=5
):
message = self._from_telegram(raw_message, channel)
all_messages.append(message)
channel_count += 1
if time.time() >= time_out:
logging.info(f"time_out message")
logging.info(f"time_out channel {channel} at {time.time()}")
break
message = self._from_telegram(raw_message)
all_messages.append(message)

if channel_entity.broadcast and raw_message.post and raw_message.replies:
replied_post_id.append(raw_message.id)

# scrape replies
for post_id in replied_post_id:
try:
async for raw_reply in telegram_client.iter_messages(
channel_entity,
offset_date=start_date,
reverse=True,
reply_to=post_id,
wait_time=5
):
reply = Message.from_telegram(raw_reply)
all_messages.append(reply)
time.sleep(5)
except Exception as e:
logging.info(f"getting replies for {message.id} failed: {e}")

if time.time() >= time_out:
logging.info(f"time_out reply")
break
else:
time.sleep(10)
continue
# scrape replies
for post_id in replied_post_id:
try:
async for raw_reply in telegram_client.iter_messages(
channel_entity,
offset_date=start_date,
reverse=True,
reply_to=post_id,
wait_time=5
):
reply = self._from_telegram(raw_reply, channel)
all_messages.append(reply)
channel_count += 1
except Exception as e:
logging.info(f"getting replies for {message.id} failed: {e}")
except Exception as e:
logging.info(f"Unable to get in telegram channel {channel}: {e}")

logging.info(f"found {channel_count} messages in channel {channel}")

telegram_client.disconnect()
return all_messages

def _from_telegram(self, message_entity):
def _from_telegram(self, message_entity, channel_name):
if not message_entity.reply_to:
reply_ = False
reply_to_ = None
Expand All @@ -308,7 +302,7 @@ def _from_telegram(self, message_entity):
datetime_scraped_=datetime.today(),
country=self.country,
source="Telegram",
group=message_entity.peer_id.channel_id,
group=channel_name,
text=message_entity.message,
reply=reply_,
reply_to=reply_to_
Expand Down
68 changes: 61 additions & 7 deletions telegram_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
import click
import logging
import sys
import asyncio
import pandas as pd
from telethon.sync import TelegramClient
from telethon.sessions import StringSession
from telethon.tl.functions.channels import GetFullChannelRequest
from azure.storage.blob import BlobServiceClient

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s : %(levelname)s : %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("requests_oauthlib").setLevel(logging.WARNING)


@click.command()
Expand Down Expand Up @@ -53,7 +58,7 @@ def run_sml_pipeline(country):
pipe.transform.set_classifier(type="setfit",
model="rodekruis/sml-ukr-message-classifier",
lang="en")
messages = pipe.transform.process_messages(messages, translate=True, classify=True)
messages = pipe.transform.process_messages(messages, translate=False, classify=False)
logging.info(f"processed {len(messages)} messages!")

pipe.load.set_storage("Azure SQL Database")
Expand All @@ -64,7 +69,56 @@ def run_sml_pipeline(country):
tags={"Country": country_code}
)
logging.info(f"saved {len(messages)} messages!")

# add member counts
telegram_client = TelegramClient(
StringSession(os.getenv('STRING_SESSION')),
os.getenv('API_ID'),
os.getenv('API_HASH')
)
telegram_client.loop.run_until_complete(
save_membercount(
telegram_client,
settings,
country,
end_date
)
)


async def save_membercount(telegram_client, settings, country, end_date):
df_member_counts = pd.DataFrame()

await telegram_client.connect()

for channel in settings[country]['channels-to-track']:
channel_entity = await telegram_client.get_entity(channel)
channel_full_info = await telegram_client(GetFullChannelRequest(channel=channel_entity))
idx = len(df_member_counts)
df_member_counts.at[idx, 'source'] = channel
member_count = channel_full_info.full_chat.participants_count
df_member_counts.at[idx, 'member_count'] = member_count
df_member_counts.at[idx, 'date'] = end_date.strftime("%m %d %Y")
df_member_counts.at[idx, 'country'] = settings[country]['country-code'].lower()
df_member_counts.at[idx, 'source_id'] = channel

# Add index column
df_member_counts.reset_index(inplace=True)
df_member_counts['id'] = df_member_counts.index

blob_service_client = BlobServiceClient.from_connection_string(os.getenv('BLOBSTORAGE_CONNECTION_STRING'))
blob_client = blob_service_client.get_blob_client(container='membercount', blob='membercount.csv')
with open('membercount.csv', "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
df_member_counts_old = pd.read_csv('membercount.csv')
df_member_counts_all = pd.concat([df_member_counts_old, df_member_counts]).reset_index(drop=True)
df_member_counts_all = df_member_counts_all.drop(columns=['index', 'id'])
df_member_counts_all.to_csv('membercount.csv', index=False, encoding="utf-8")
with open('membercount.csv', "rb") as upload_file:
blob_client.upload_blob(upload_file, overwrite=True)
if os.path.exists('membercount.csv'):
os.remove('membercount.csv')


if __name__ == '__main__':
run_sml_pipeline()

0 comments on commit 7cdc5a5

Please sign in to comment.