generated from streamlit/blank-app-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
1430 lines (1230 loc) · 56.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import time
import numpy as np
import pandas as pd
import requests
import base64
import re
from typing import Optional, Any
import streamlit as st
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer, util
import nltk
import spacy
import ssl
import string
import os
import torch
import difflib
import people_also_ask as paa
from streamlit_quill import st_quill
import altair as alt
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import textstat
from nltk.tokenize import sent_tokenize, word_tokenize
from supabase import create_client, Client
from transformers import pipeline
torch.classes.__path__ = [] # add this line to manually set it to empty.
from dotenv import load_dotenv
load_dotenv()
load_dotenv()
# For st_login_form, you can specify them here OR rely on environment variables
SUPABASE_URL = os.getenv("SUPABASE_URL") # e.g. "https://<project>.supabase.co"
SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY")
SUPABASE_TABLE="users"
supabase: Client = create_client(SUPABASE_URL, SUPABASE_ANON_KEY)
# Attempt to fix SSL issues for NLTK downloads
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context
# Download needed NLTK data
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
max_contents = 10
@st.cache_resource
def load_spacy_model():
try:
return spacy.load('en_core_web_sm')
except OSError:
from spacy.cli import download
download('en_core_web_sm')
return spacy.load('en_core_web_sm')
nlp = load_spacy_model()
@st.cache_resource
def load_ner_pipeline():
return pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple")
ner_pipeline = load_ner_pipeline()
def compute_ner_count(text):
"""Compute the number of named entities in the text."""
if not text:
return 0
entities = ner_pipeline(text[:2000]) # Limit to first 2000 characters for speed
return len(entities)
def compute_pos_counts(text, normalize=True):
"""Count the number of adverbs, adjectives, and verbs in the given text.
Args:
text (str): The input text to analyze.
normalize (bool): Whether to return normalized counts. Defaults to True.
Returns:
dict: A dictionary with counts of adverbs, adjectives, and verbs.
"""
doc = nlp(text)
total_words = len([token for token in doc if token.is_alpha]) # Exclude punctuation
pos_counts = {
"adverbs": sum(1 for token in doc if token.pos_ == "ADV"),
"adjectives": sum(1 for token in doc if token.pos_ == "ADJ"),
"verbs": sum(1 for token in doc if token.pos_ == "VERB")
}
if normalize and total_words > 0:
for key in pos_counts:
pos_counts[key] /= total_words # Normalize each POS count
elif normalize:
for key in pos_counts:
pos_counts[key] = 0 # Avoid division by zero
return pos_counts
def compute_lexical_diversity(text):
"""Compute lexical diversity as the ratio of unique words to total words."""
doc = nlp(text)
words = [token.text.lower() for token in doc if token.is_alpha]
unique_words = set(words)
return len(unique_words) / len(words) if words else 0
def compute_readability_score(text: str) -> float:
"""
Compute Flesch-Kincaid Grade using textstat.
For very short texts (like a title), the score may be meaningless.
"""
if not text or len(text.split()) < 3:
return None
return textstat.flesch_kincaid_grade(text)
def compute_text_stats_from_html(html_content: str):
"""
Parse the HTML to extract paragraphs and compute various text statistics.
"""
soup = BeautifulSoup(html_content, "html.parser")
# Find paragraphs by <p> tags
paragraphs = soup.find_all("p")
# Fallback if no <p> tags found (optional):
if not paragraphs:
# You could treat the entire HTML as a single "paragraph".
paragraphs = [soup]
paragraph_count = len(paragraphs)
total_sentences = 0
total_words = 0
sentence_lengths = []
for p in paragraphs:
p_text = p.get_text(separator=" ", strip=True)
# Tokenize into sentences
sentences = sent_tokenize(p_text)
total_sentences += len(sentences)
# For each sentence, tokenize words to measure length
for sent in sentences:
words = word_tokenize(sent)
sentence_lengths.append(len(words))
total_words += len(words)
# Avoid division by zero if the text is empty
avg_sentences_per_paragraph = 0
if paragraph_count > 0:
avg_sentences_per_paragraph = total_sentences / paragraph_count
avg_words_per_sentence = 0
if total_sentences > 0:
avg_words_per_sentence = total_words / total_sentences
# Compute reading ease scores (requires `pip install textstat`)
# Extract plain text from the entire HTML for textstat
plain_text = soup.get_text(separator=" ", strip=True)
flesch_reading_ease = textstat.flesch_reading_ease(plain_text)
flesch_kincaid_grade = textstat.flesch_kincaid_grade(plain_text)
return {
"paragraph_count": paragraph_count,
"sentence_count": total_sentences,
"word_count": total_words,
"avg_sentences_per_paragraph": avg_sentences_per_paragraph,
"avg_words_per_sentence": avg_words_per_sentence,
"flesch_reading_ease": flesch_reading_ease,
"flesch_kincaid_grade": flesch_kincaid_grade,
"sentence_lengths": sentence_lengths,
}
@st.cache_resource
def load_embedding_model():
return SentenceTransformer('all-MiniLM-L6-v2')
def google_custom_search(query, api_key, cse_id, num_results=10, delay=1):
"""Performs Google Custom Search with rate-limiting."""
all_results = []
start_index = 1
while len(all_results) < num_results:
remaining = num_results - len(all_results)
current_num = min(10, remaining)
url = "https://customsearch.googleapis.com/customsearch/v1"
params = {
'q': query,
'key': api_key,
'cx': cse_id,
'num': current_num,
'start': start_index,
'hl': 'en',
'cr': 'countryUS'
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
break
all_results.extend(items)
start_index += current_num
time.sleep(delay*10) # delay between requests
except requests.exceptions.RequestException as e:
if response.status_code == 429:
print("Rate limit exceeded. Retrying in 60 seconds...")
time.sleep(120)
continue
else:
print(f"Error: {e}")
return []
return all_results
def extract_content_from_url(url, extract_headings=False, retries=2, timeout=5):
"""Extract main textual content (paragraphs) from a webpage."""
headers = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/58.0.3029.110 Safari/537.3'
)
}
for _ in range(retries):
try:
resp = requests.get(url, headers=headers, timeout=timeout)
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, 'html.parser')
title = soup.title.string.strip() if soup.title and soup.title.string else "No Title"
headings = []
if extract_headings:
for level in ['h2', 'h3', 'h4']:
for tag in soup.find_all(level):
headings.append({'level': level, 'text': tag.get_text(strip=True)})
icon_link = soup.find('link', rel=lambda x: x and ('icon' in x.lower()))
if icon_link and icon_link.get('href'):
favicon_url = urljoin(url, icon_link['href'])
else:
favicon_url = urljoin(url, '/favicon.ico')
paragraphs = soup.find_all('p')
content = ' '.join([p.get_text() for p in paragraphs])
return title, content.strip(), favicon_url, headings, soup
except requests.RequestException:
pass
time.sleep(1)
return None, "", "", [], None
def detailed_extraction(soup, url):
# Clone the soup to avoid modifying the original
soup_clone = BeautifulSoup(str(soup), 'html.parser')
# Remove the <footer> element and its contents so we only analyze the article content
footer = soup_clone.find("footer")
if footer:
footer.decompose()
# Extract Title
title = soup_clone.title.string.strip() if soup_clone.title and soup_clone.title.string else "No Title"
# Extract Meta Description
meta_description = None
meta_tag = soup_clone.find("meta", attrs={"name": "description"})
if meta_tag and meta_tag.get("content"):
meta_description = meta_tag.get("content").strip()
# Extract main Content (using <p> tags) and count paragraphs
paragraphs = soup_clone.find_all("p")
content = " ".join([p.get_text().strip() for p in paragraphs if p.get_text().strip()])
num_paragraphs = len([p for p in paragraphs if p.get_text().strip()])
# Count headings and bullet lists (only those above the removed footer)
num_h2 = len(soup_clone.find_all("h2"))
num_h3 = len(soup_clone.find_all("h3"))
num_bullet_lists = len(soup_clone.find_all("ul"))
return {
"url": url,
"title": title,
"meta_description": meta_description,
"content": content,
"num_paragraphs": num_paragraphs,
"num_h2": num_h2,
"num_h3": num_h3,
"num_bullet_lists": num_bullet_lists
}
@st.cache_resource
def lemmatize_text(text: str) -> str:
doc = nlp(text)
lemmatized_tokens = []
for token in doc:
# context-aware overrides
if token.text.lower() == "media" and token.lemma_.lower() == "medium":
lemmatized_tokens.append("media")
elif token.text.lower() == "data" and token.lemma_.lower() == "datum":
lemmatized_tokens.append("data")
elif token.text.lower() == "publishers" and token.lemma_.lower() == "publisher":
lemmatized_tokens.append("publisher")
else:
lemmatized_tokens.append(token.lemma_)
return ' '.join(lemmatized_tokens)
def remove_duplicate_questions(questions, similarity_threshold=0.75):
# If 0 or 1 questions, there's nothing to deduplicate
if len(questions) < 2:
return questions
# Preprocess questions
def preprocess(text):
# Lowercase, remove punctuation
text = text.lower()
text = text.translate(str.maketrans('', '', string.punctuation))
return text
# Encode questions using SentenceTransformer
model = load_embedding_model()
preprocessed = [preprocess(q) for q in questions]
embeddings = model.encode(preprocessed)
# If embeddings is empty or only 1 row, again just return
if embeddings.shape[0] < 2:
return questions
# Compute cosine similarity matrix
similarity_matrix = cosine_similarity(embeddings)
# Cluster questions
clustering_model = AgglomerativeClustering(
n_clusters=None,
affinity='precomputed',
linkage='complete',
distance_threshold=1 - similarity_threshold
)
clustering_model.fit(1 - similarity_matrix)
# Select a representative question from each cluster
cluster_labels = clustering_model.labels_
cluster_map = {}
for idx, label in enumerate(cluster_labels):
cluster_map.setdefault(label, []).append(questions[idx])
final_questions = []
for _, qs in cluster_map.items():
# pick the shortest question from the cluster
rep = min(qs, key=len)
final_questions.append(rep)
return final_questions
def extract_brand_name(url, title):
parsed = urlparse(url)
parts = parsed.netloc.split('.')
if parts and parts[0] == 'www':
parts.pop(0)
domain_root = parts[0].capitalize() if parts else 'Unknown'
if title:
segs = title.split(' - ')
for seg in reversed(segs):
ratio = difflib.SequenceMatcher(None, domain_root.lower(), seg.lower()).ratio()
if ratio > 0.8:
return seg.strip()
return domain_root
@st.cache_resource
def is_brand_mentioned(term, brand_name):
# direct substring
if brand_name.lower() in term.lower():
return True
# fuzzy match ratio
ratio = difflib.SequenceMatcher(
None,
term.lower().replace(' ', ''),
brand_name.lower().replace(' ', '')
).ratio()
if ratio > 0.8:
return True
# check for named entity
doc = nlp(term)
for ent in doc.ents:
if ent.label_ in ['ORG', 'PRODUCT', 'PERSON', 'GPE']:
ratio_ent = difflib.SequenceMatcher(
None,
ent.text.lower().replace(' ', ''),
brand_name.lower().replace(' ', '')
).ratio()
if ratio_ent > 0.8:
return True
return False
def is_not_branded(question):
"""Return True if question does NOT mention any brand in st.session_state['brands']"""
brands = st.session_state.get('brands', [])
for brand in brands:
if is_brand_mentioned(question, brand):
return False
return True
# Initialize sentiment pipeline (cache as needed)
sentiment_pipeline = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english"
)
def compute_readability(text):
if not text or len(text.split()) < 3:
return None
try:
return textstat.flesch_kincaid_grade(text)
except Exception:
return None
def compute_sentiment(text):
text = text.strip()
if not text:
return None
try:
# If text is very long, limit to first 512 characters
trimmed_text = text if len(text) <= 512 else text[:512]
result = sentiment_pipeline(trimmed_text)
if result:
label = result[0]['label'].upper()
score = result[0]['score']
return score if label == "POSITIVE" else -score
else:
return None
except Exception:
return None
def compute_serp_features(details, position):
content = details.get("content", "")
pos_counts = compute_pos_counts(content) # Get POS counts
features = {
"position": position,
"url": details.get("url"),
"title": details.get("title"),
"title_readability": compute_readability(details.get("title")),
"title_sentiment": compute_sentiment(details.get("title")),
"meta_readability": compute_readability(details.get("meta_description") or ""),
"meta_sentiment": compute_sentiment(details.get("meta_description") or ""),
"content_readability": compute_readability(details.get("content")),
"content_sentiment": compute_sentiment(details.get("content")),
"word_count": len(details.get("content", "").split()),
"num_paragraphs": details.get("num_paragraphs"),
"num_h2": details.get("num_h2"),
"num_h3": details.get("num_h3"),
"num_bullet_lists": details.get("num_bullet_lists"),
"entity_count": compute_ner_count(content),
"lexical_diversity": compute_lexical_diversity(content),
"adverbs": pos_counts["adverbs"],
"adjectives": pos_counts["adjectives"],
"verbs": pos_counts["verbs"]
}
return features
def create_correlation_radar(target, pearson_corr, spearman_corr):
"""
Generate a radar chart comparing Pearson and Spearman correlations with the target variable.
"""
# Remove the target itself from the features list
features = [f for f in pearson_corr.columns if f != target]
if not features:
st.info("No features available for radar chart.")
return
# Get correlations for each feature (dropping the target)
pearson_vals = pearson_corr[target].drop(target).values
spearman_vals = spearman_corr[target].drop(target).values
N = len(features)
angles = np.linspace(0, 2 * np.pi, N, endpoint=False).tolist()
# Close the circle for plotting
angles += angles[:1]
pearson_vals = np.concatenate([pearson_vals, [pearson_vals[0]]])
spearman_vals = np.concatenate([spearman_vals, [spearman_vals[0]]])
fig, ax = plt.subplots(figsize=(8, 8), subplot_kw={'projection': 'polar'})
ax.plot(angles, pearson_vals, color="r", linewidth=2, label="Pearson")
ax.fill(angles, pearson_vals, color="r", alpha=0.25)
ax.plot(angles, spearman_vals, color="b", linewidth=2, label="Spearman")
ax.fill(angles, spearman_vals, color="b", alpha=0.25)
# Set the feature labels around the circle
ax.set_xticks(angles[:-1])
# Optionally wrap long feature names
wrapped_labels = ["\n".join(wrap(label, 10)) for label in features]
ax.set_xticklabels(wrapped_labels, fontsize=10)
ax.set_title(f"Radar Chart: Correlations with {target}", y=1.1, fontsize=14, weight="bold")
ax.legend(loc="upper right", bbox_to_anchor=(1.1, 1.1))
st.pyplot(fig)
def extract_headings_for_paa_list(top_urls):
"""Extract headings for PAA list from the given top URLs."""
headings_data = []
for idx, url in enumerate(top_urls):
print(f"\nProcessing URL {idx+1}/{len(top_urls)} for headings: {url}")
t, _, _, heads, _ = extract_content_from_url(url, extract_headings=True)
if t is None:
t = "No Title"
if heads:
for h in heads:
if 'text' in h:
headings_data.append({
'text': h['text'].strip(),
'url': url,
'title': t
})
time.sleep(0.5)
if headings_data:
question_words = ['how', 'why', 'what', 'who', 'which', 'is', 'are', 'can', 'does', 'will']
# headings that either end in ? or start with a question word
filtered_headings_data = [
h for h in headings_data
if (h['text'].endswith('?')
or (h['text'].split() and h['text'].split()[0].lower() in question_words))
]
# remove duplicates by (text, url, title)
unique_set = {(hd['text'], hd['url'], hd['title']) for hd in filtered_headings_data}
filtered_headings_data = [
{'text': t, 'url': u, 'title': ti}
for (t,u,ti) in unique_set
]
# fetch PAA
google_paa = []
# remove duplicates across PAA
# Combine them into a DataFrame
if len(filtered_headings_data) > 0:
df_hd = pd.DataFrame(filtered_headings_data, columns=['text','url','title'])
paa_rows = [{'Question': q, 'URL': 'No URL', 'Title': 'No Title'} for q in google_paa]
# Convert headings to same columns
heading_rows = df_hd.rename(columns={'text':'Question','url':'URL','title':'Title'})
# combine
paa_df = pd.concat([heading_rows, pd.DataFrame(paa_rows)], ignore_index=True)
st.session_state['paa_list'] = paa_df
# Keep a separate headings_df for detailed headings
else:
# fallback empty
st.session_state['paa_list'] = pd.DataFrame(columns=['Question','URL','Title'])
else:
st.session_state['paa_list'] = pd.DataFrame(columns=['Question','URL','Title'])
def display_serp_details():
st.header("SERP Details")
st.write("Analyze the SERP in more detail.")
print("Displaying SERP Details")
# Safety check: ensure the analysis was run
if 'serp_contents' not in st.session_state or not st.session_state['serp_contents']:
st.warning("No SERP content available. Please run the analysis first.")
# Extract headings for PAA list
if 'top_urls' in st.session_state:
extract_headings_for_paa_list(st.session_state['top_urls'])
# 1) Process each SERP entry: extract detailed data and compute features
features_list = []
serp_data = st.session_state['serp_contents']
for row in serp_data:
# Use the soup object and URL stored in each entry.
details = detailed_extraction(row['soup'], row['url'])
# Compute features for this SERP entry using its position.
feat = compute_serp_features(details, row['position'])
features_list.append(feat)
# 2) Create a DataFrame for analysis and display
df = pd.DataFrame(features_list)
st.subheader("Computed Features for Each URL")
st.dataframe(df)
# 3) Visualize distributions for each numeric metric
st.subheader("Distributions of Numeric Metrics")
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
for col in numeric_cols:
chart = alt.Chart(df).mark_bar().encode(
alt.X(f"{col}:Q", bin=alt.Bin(maxbins=15), title=col),
alt.Y("count()", title="Frequency")
).properties(
width=300,
height=200,
title=f"Distribution of {col}"
)
st.altair_chart(chart, use_container_width=True)
# 4) Display a correlation heatmap for the numeric features
st.subheader("Correlation Matrix Heatmap")
if numeric_cols:
corr = df[numeric_cols].corr().reset_index().melt("index")
chart_corr = alt.Chart(corr).mark_rect().encode(
x=alt.X("variable:N", title="Feature"),
y=alt.Y("index:N", title="Feature"),
color=alt.Color("value:Q", scale=alt.Scale(scheme="redblue", domain=(-1, 1))),
tooltip=["index", "variable", alt.Tooltip("value:Q", format=".2f")]
).properties(width=500, height=500)
st.altair_chart(chart_corr, use_container_width=True)
else:
st.info("No numeric data available for correlation analysis.")
# 5) Example scatter plot: Word Count vs. Content Readability
st.subheader("Scatter Plot: Word Count vs. Content Readability")
if "word_count" in df.columns and "content_readability" in df.columns:
scatter = alt.Chart(df).mark_circle(size=100).encode(
x=alt.X("word_count:Q", title="Word Count"),
y=alt.Y("content_readability:Q", title="Content Readability (Flesch-Kincaid Grade)"),
color=alt.Color("position:O", title="SERP Position", scale=alt.Scale(scheme="purplered")),
tooltip=[
alt.Tooltip("url:N", title="URL"),
alt.Tooltip("word_count:Q", title="Word Count"),
alt.Tooltip("content_readability:Q", title="Content Readability"),
alt.Tooltip("position:O", title="SERP Position")
]
).properties(width=500, height=400)
st.altair_chart(scatter, use_container_width=True)
# POS Counts - Adverbs, Adjectives, Verbs
st.subheader("Parts of Speech (POS) Analysis")
pos_melted = df.melt(id_vars=["position"], value_vars=["adverbs", "adjectives", "verbs"],
var_name="POS", value_name="Count")
pos_chart = alt.Chart(pos_melted).mark_bar().encode(
x=alt.X("position:O", title="SERP Position"),
y=alt.Y("Count:Q", title="POS Count"),
color=alt.Color("POS:N", title="Part of Speech"),
tooltip=[alt.Tooltip("POS:N", title="Part of Speech"), alt.Tooltip("Count:Q", title="Count")]
).properties(width=600, height=400)
st.altair_chart(pos_chart, use_container_width=True)
st.subheader("Scatter Plots: POS Proportions vs. Readability")
# 3) Grouped Bar Chart: Average POS Counts Across Readability Levels
st.subheader("Average POS Usage Across Readability Levels")
readability_bins = pd.cut(df["content_readability"], bins=[0, 5, 10, 15, 20, 25], labels=["0-5", "5-10", "10-15", "15-20", "20+"])
df["readability_group"] = readability_bins
avg_pos_df = df.groupby("readability_group")[["adverbs", "adjectives", "verbs"]].mean().reset_index().melt(id_vars=["readability_group"], var_name="POS", value_name="Average Usage")
avg_pos_chart = alt.Chart(avg_pos_df).mark_bar().encode(
x=alt.X("readability_group:N", title="Readability Score Range"),
y=alt.Y("Average Usage:Q", title="Average POS Usage"),
color=alt.Color("POS:N", title="Part of Speech"),
tooltip=[alt.Tooltip("POS:N", title="Part of Speech"), alt.Tooltip("Average Usage:Q", title="Usage", format=".4f")]
).properties(width=600, height=400)
st.altair_chart(avg_pos_chart, use_container_width=True)
# Compute similarity of questions to the keyword
if 'paa_list' in st.session_state and 'keyword' in st.session_state:
st.subheader("PAA Questions Similarity to Keyword")
paa_df = st.session_state['paa_list']
keyword = st.session_state['keyword']
model = load_embedding_model()
keyword_embedding = model.encode([keyword])[0]
paa_embeddings = model.encode(paa_df['Question'].tolist())
similarities = cosine_similarity([keyword_embedding], paa_embeddings)[0]
paa_df['Similarity'] = similarities
# Filter out branded questions and questions with less than 4 words
paa_df = paa_df[paa_df['Question'].apply(is_not_branded)]
paa_df = paa_df[paa_df['Question'].apply(lambda x: len(x.split()) >= 4)]
# Compute NER count for each question
paa_df['NER Count'] = paa_df['Question'].apply(compute_ner_count)
# Filter out rows with similarity below 0.15 or NER count above 2
paa_df = paa_df[(paa_df['Similarity'] >= 0.15) & (paa_df['NER Count'] <= 2)]
st.dataframe(paa_df[['Question', 'Similarity', 'NER Count']])
# 6) Button to return to the Editor screen
if st.button("Return to Editor"):
st.session_state['step'] = 'editor'
st.rerun()
def filter_terms(terms):
"""Filter out numeric, stopword, or other low-value tokens."""
custom_stopwords = set([
"i","me","my","myself","we","our","ours","ourselves","you","your","way","yours",
"yourself","yourselves","he","him","his","himself","she","her","hers","herself",
"it","its","itself","they","them","their","theirs","themselves","what","which","who",
"whom","this","that","these","those","am","is","are","was","were","be","been","being",
"have","has","had","having","do","does","did","doing","a","an","the","and","but","if","or",
"because","as","until","while","of","at","by","for","with","about","against","between","into",
"through","during","before","after","above","below","to","from","up","down","in","out","on","off",
"over","under","again","further","then","once","here","there","when","where","why","how","all",
"any","both","each","few","more","most","other","some","such","no","nor","not","only","own","same",
"so","than","too","very","s","t","can","will","just","don","should","now","like","need"
])
filtered = []
seen = set()
for term in terms:
if any(ch.isdigit() for ch in term):
continue
doc = nlp(term)
# skip if it has undesired POS
if any(tok.pos_ in ['AUX','PRON','DET','ADP','CCONJ','NUM','SYM','PUNCT'] for tok in doc):
continue
# check for stopwords
lemmas = [t.lemma_.lower() for t in doc]
if any(lm in custom_stopwords or lm in nlp.Defaults.stop_words for lm in lemmas):
continue
final_lemma = ' '.join(lemmas)
if final_lemma not in seen:
filtered.append(final_lemma)
seen.add(final_lemma)
return filtered
def perform_analysis(keyword):
"""Refactored function using logic consistent with the React+FastAPI version,
but preserving EXACT st.session_state keys and formats used in the original code.
"""
start_time = time.time()
user_email = st.session_state["username"] or "guest" # or "user_id" if you have it
status_placeholder = st.empty() # Create a placeholder for dynamic updates
status_placeholder.info('Retrieving top search results...')
st.session_state['keyword'] = keyword
st.session_state['serp_contents'] = [] # NEW: store structured data
api_key = os.getenv("API_KEY")
cse_id = os.getenv("CSE_ID")
# 1) Retrieve search items
results = google_custom_search(keyword, api_key, cse_id, num_results=35)
if not results:
status_placeholder.error('No results found.')
return
top_urls = [item['link'] for item in results if 'link' in item]
if not top_urls:
status_placeholder.error('No URLs found.')
return
st.session_state['top_urls'] = top_urls
# 2) Extract content from top URLs
titles, favicons, retrieved_content = [], [], []
headings_data = []
successful_urls = []
word_counts = []
brand_names = set()
progress = st.progress(0)
for idx, url in enumerate(top_urls):
if len(retrieved_content) >= max_contents:
break
print(f"\nProcessing URL {idx+1}/{len(top_urls)}: {url}")
progress.progress(idx / len(top_urls))
status_placeholder.info(f"Retrieving content from {url}...")
t, content, favicon_url, heads, soup = extract_content_from_url(url, extract_headings=True)
if t is None:
t = "No Title"
# brand
print("Filtering branded content")
brand_name = extract_brand_name(url, t)
brand_names.add(brand_name)
if heads:
for h in heads:
if 'text' in h:
headings_data.append({
'text': h['text'].strip(),
'url': url,
'title': t
})
if content:
wc = len(content.split())
retrieved_content.append(content)
successful_urls.append(url)
titles.append(t)
favicons.append(favicon_url)
# anchor word count at least 1000
word_counts.append(wc if wc > 1000 else 1000)
st.session_state['serp_contents'].append({
"position": idx + 1, # 1-based SERP rank
"url": url,
"title": t,
"content": content,
"favicon": favicon_url,
"word_counts": wc if wc > 1000 else 1000,
"soup": soup
})
time.sleep(0.5)
progress.empty()
status_placeholder.empty() # Remove the last message after completion
# store brand names
st.session_state['brands'] = list(brand_names)
if not retrieved_content:
st.error('Failed to retrieve sufficient content.')
return
if len(word_counts) > 0:
ideal_count = int(np.median(word_counts)) + 500
else:
ideal_count = 1000
st.session_state['ideal_word_count'] = ideal_count
# 3) Clean and lemmatize
docs_lemmatized = [lemmatize_text(doc) for doc in retrieved_content]
# 5) Display top search results
st.subheader('Top Search Results')
for i in range(len(titles)):
fc = favicons[i]
t = titles[i]
link = successful_urls[i]
wc = word_counts[i]
st.markdown(
f"""
<div style="background-color: white; padding: 10px; border-radius: 5px; margin-bottom: 10px; color: black;">
<div style="display: flex; align-items: center;">
<img src="{fc}" width="32" style="margin-right: 10px;">
<div>
<strong>{t}</strong> ({wc} words)<br>
<a href="{link}" target="_blank">{link}</a>
</div>
</div>
</div>
""", unsafe_allow_html=True
)
lower_bound = (ideal_count // 500) * 500
upper_bound = lower_bound + 500
st.info(f"**Suggested Word Count:** Aim for approx. {lower_bound}–{upper_bound} words based on top content.")
print("Starting TF-IDF operations")
# 6) TF-IDF + CountVectorizer
model = load_embedding_model()
tfidf_vectorizer = TfidfVectorizer(ngram_range=(1,3))
tf_vectorizer = CountVectorizer(ngram_range=(1,3))
tfidf_matrix = tfidf_vectorizer.fit_transform(docs_lemmatized).toarray()
tf_matrix = tf_vectorizer.fit_transform(docs_lemmatized).toarray()
feature_names = tfidf_vectorizer.get_feature_names_out()
filtered_feats = filter_terms(feature_names)
# filter the matrices
idxs = [i for i, term in enumerate(feature_names) if term in filtered_feats]
tfidf_matrix_f = tfidf_matrix[:, idxs]
tf_matrix_f = tf_matrix[:, idxs]
filtered_feature_names = [feature_names[i] for i in idxs]
# compute average
avg_tfidf = np.mean(tfidf_matrix_f, axis=0)
avg_tf = np.mean(tf_matrix_f, axis=0)
# also track doc lengths
doc_word_counts = [len(d.split()) for d in docs_lemmatized]
avg_doc_len = float(sum(doc_word_counts)) / max(1, len(doc_word_counts))
# normalizing
avg_tfidf /= avg_doc_len
avg_tf /= avg_doc_len
print("Generating embeddings...")
# 7) Now compute similarity for each term to the user keyword
keyword_emb = model.encode([keyword])[0]
term_embeddings = model.encode(filtered_feature_names)
similarities = cosine_similarity([keyword_emb], term_embeddings)[0]
# "Combined Score" = average tf-idf * similarity
combined_scores = avg_tfidf * similarities
# get top 50
N = 50
top_idx = np.argsort(combined_scores)[-N:][::-1]
top_terms = [filtered_feature_names[i] for i in top_idx]
top_combined = [combined_scores[i] for i in top_idx]
top_tfidf = [avg_tfidf[i] for i in top_idx]
top_tf = [avg_tf[i] for i in top_idx]
top_sim = [similarities[i] for i in top_idx]
# 8) Store in session_state
# chart_data => DataFrame with columns: Terms, Combined Score, Average TF-IDF Score, Similarity to Keyword
st.session_state['chart_data'] = pd.DataFrame({
'Terms': top_terms,
'Combined Score': top_combined,
'Average TF-IDF Score': [x * 100 for x in top_tfidf],
'Similarity to Keyword': [x * 100 for x in top_sim]
})
print(top_tfidf)
# words_to_check => list of dicts with Term, Average TF Score, Average TF-IDF Score
st.session_state['words_to_check'] = [
{
'Term': top_terms[i],
'Average TF Score': top_tf[i],
'Average TF-IDF Score': top_tfidf[i]
}
for i in range(len(top_terms))
]
data_to_store = {
"keyword": keyword,
"top_urls": top_urls, # or successful_urls
"brand_names": list(brand_names),
"ideal_word_count": ideal_count,
# Combine your relevant data (like words_to_check, chart_data, etc.) in JSON
"analysis_data": {
"words_to_check": st.session_state['words_to_check'],
"chart_data": st.session_state['chart_data'].to_dict() if 'chart_data' in st.session_state else {},
# add more fields as needed...
},
}
try:
response = supabase.table("analysis_results").insert({
"user_email": user_email,
"keyword": keyword,
"top_urls": top_urls,
"brand_names": list(brand_names),
"ideal_word_count": ideal_count,
"analysis_data": data_to_store['analysis_data']
}).execute()
except APIError as e:
st.error(f"Supabase insert failed: {e}")
# else:
inserted_rows = response.data
if inserted_rows and len(inserted_rows) > 0:
new_id = inserted_rows[0]["id"]
st.session_state["analysis_id"] = new_id
st.success(f"Saved analysis data with ID: {new_id}")
st.session_state['analysis_completed'] = True
elapsed_time = time.time() - start_time
print(f"Time taken for analysis: {elapsed_time:.2f} seconds")
def display_editor():
# Add a button to start a new analysis
if st.button('Start a New Analysis'):
st.session_state.clear()
st.session_state["authenticated"] = False
st.rerun()
# Retrieve the ideal word count from session state
ideal_word_count = st.session_state.get('ideal_word_count', None)
# ---- NEW BUTTON to see SERP details ----
if st.button("Analyze SERP in Detail", key='serp_details',type="primary", icon=":material/zoom_in:"):
st.session_state['step'] = 'serp_details'
st.rerun()
# Display the ideal word count suggestion
if ideal_word_count:
lower_bound = (ideal_word_count // 500) * 500
upper_bound = lower_bound + 500
st.info(f"**Suggested Word Count:** Aim for approximately {lower_bound} to {upper_bound} words based on top-performing content.")
# Update sidebar label
st.sidebar.subheader('Optimize Your Content with These Words')
# Grab words_to_check from session_state
words_to_check = st.session_state['words_to_check']
# Create a Quill editor for inputting and editing text
text_input = st_quill(placeholder='Start typing your content here...', key='quill')
# Adjust Quill editor height
st.markdown("""
<style>
.stQuill {
height: 400px;
}