megacursos1 / app-autoclient-enhanced-with-19-more.py
luigi12345's picture
Upload folder using huggingface_hub (#9)
29daf4e verified
raw
history blame
118 kB
import streamlit as st
import pandas as pd
import asyncio
from datetime import datetime
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
from googlesearch import search
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
import openai
import logging
import json
import re
import os
import sqlite3
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor
import time
import random
import plotly.express as px
AWS_ACCESS_KEY_ID = "AKIASO2XOMEGIVD422N7"
AWS_SECRET_ACCESS_KEY = "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"
REGION_NAME = "us-east-1"
openai.api_key = os.getenv("OPENAI_API_KEY", "sk-1234")
openai.api_base = os.getenv("OPENAI_API_BASE", "https://openai-proxy-kl3l.onrender.com")
openai_model = "gpt-3.5-turbo"
# SQLite configuration
sqlite_db_path = "autoclient.db"
# Ensure the database file exists
try:
if not os.path.exists(sqlite_db_path):
open(sqlite_db_path, 'w').close()
except IOError as e:
logging.error(f"Failed to create database file: {e}")
raise
# Initialize AWS SES client
try:
ses_client = boto3.client('ses',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION_NAME)
except (NoCredentialsError, PartialCredentialsError) as e:
logging.error(f"AWS SES client initialization failed: {e}")
raise
# SQLite connection
def get_db_connection():
conn = sqlite3.connect(sqlite_db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
# HTTP session with retry strategy
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
# Setup logging
try:
logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
except IOError as e:
print(f"Error setting up logging: {e}")
raise
# Input validation functions
def validate_name(name):
if not name or not name.strip():
raise ValueError("Name cannot be empty or just whitespace")
if len(name) > 100:
raise ValueError("Name is too long (max 100 characters)")
return name.strip()
def validate_email(email):
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
raise ValueError("Invalid email address")
return email
def validate_campaign_type(campaign_type):
valid_types = ["Email", "SMS"]
if campaign_type not in valid_types:
raise ValueError(f"Invalid campaign type. Must be one of {valid_types}")
return campaign_type
def validate_id(id_value, id_type):
try:
id_int = int(id_value.split(':')[0] if ':' in str(id_value) else id_value)
if id_int <= 0:
raise ValueError
return id_int
except (ValueError, AttributeError):
raise ValueError(f"Invalid {id_type} ID")
def validate_status(status, valid_statuses):
if status not in valid_statuses:
raise ValueError(f"Invalid status. Must be one of {valid_statuses}")
return status
def validate_num_results(num_results):
if not isinstance(num_results, int) or num_results < 0:
raise ValueError("Invalid number of results")
return num_results
# Initialize database
def init_db():
conn = get_db_connection()
cursor = conn.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS campaigns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_name TEXT NOT NULL,
project_id INTEGER,
campaign_type TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects (id)
);
CREATE TABLE IF NOT EXISTS message_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
template_name TEXT NOT NULL,
subject TEXT,
body_content TEXT NOT NULL,
campaign_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id)
);
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT,
phone TEXT,
first_name TEXT,
last_name TEXT,
company TEXT,
job_title TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS lead_sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
lead_id INTEGER,
search_term_id INTEGER,
url TEXT,
page_title TEXT,
meta_description TEXT,
http_status INTEGER,
scrape_duration TEXT,
meta_tags TEXT,
phone_numbers TEXT,
content TEXT,
tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (lead_id) REFERENCES leads (id),
FOREIGN KEY (search_term_id) REFERENCES search_terms (id)
);
CREATE TABLE IF NOT EXISTS campaign_leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER,
lead_id INTEGER,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id),
FOREIGN KEY (lead_id) REFERENCES leads (id)
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER,
lead_id INTEGER,
template_id INTEGER,
customized_subject TEXT,
customized_content TEXT,
sent_at TIMESTAMP,
status TEXT DEFAULT 'pending',
engagement_data TEXT,
message_id TEXT,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id),
FOREIGN KEY (lead_id) REFERENCES leads (id),
FOREIGN KEY (template_id) REFERENCES message_templates (id)
);
CREATE TABLE IF NOT EXISTS search_term_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email_template TEXT,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS search_terms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
term TEXT NOT NULL,
group_id INTEGER,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (group_id) REFERENCES search_term_groups (id)
);
CREATE TABLE IF NOT EXISTS knowledge_base (
id INTEGER PRIMARY KEY AUTOINCREMENT,
kb_name TEXT,
kb_bio TEXT,
kb_values TEXT,
contact_name TEXT,
contact_role TEXT,
contact_email TEXT,
company_description TEXT,
company_mission TEXT,
company_target_market TEXT,
company_other TEXT,
product_name TEXT,
product_description TEXT,
product_target_customer TEXT,
product_other TEXT,
other_context TEXT,
example_email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS ai_request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
function_name TEXT NOT NULL,
prompt TEXT NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS optimized_search_terms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
term TEXT NOT NULL,
original_term_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (original_term_id) REFERENCES search_terms (id)
);
''')
conn.commit()
conn.close()
logging.info("Database initialized successfully!")
# Call this at the start of your script
init_db()
def alter_messages_table():
conn = get_db_connection()
cursor = conn.cursor()
try:
# Check if the column exists
cursor.execute("PRAGMA table_info(messages)")
columns = [column[1] for column in cursor.fetchall()]
if 'message_id' not in columns:
cursor.execute("ALTER TABLE messages ADD COLUMN message_id TEXT")
conn.commit()
logging.info("Added message_id column to messages table")
else:
logging.info("message_id column already exists in messages table")
except sqlite3.Error as e:
logging.error(f"Error altering messages table: {e}")
finally:
conn.close()
alter_messages_table()
# Function to create a new project
def create_project(project_name):
project_name = validate_name(project_name)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO projects (project_name) VALUES (?)", (project_name,))
project_id = cursor.lastrowid
conn.commit()
conn.close()
return project_id
# Function to create a new campaign
def create_campaign(campaign_name, project_id, campaign_type):
campaign_name = validate_name(campaign_name)
project_id = validate_id(project_id, "project")
campaign_type = validate_campaign_type(campaign_type)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO campaigns (campaign_name, project_id, campaign_type) VALUES (?, ?, ?)",
(campaign_name, project_id, campaign_type))
campaign_id = cursor.lastrowid
conn.commit()
conn.close()
return campaign_id
# Function to create a new message template
def create_message_template(template_name, subject, body_content, campaign_id):
template_name = validate_name(template_name)
subject = validate_name(subject)
campaign_id = validate_id(campaign_id, "campaign")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO message_templates (template_name, subject, body_content, campaign_id)
VALUES (?, ?, ?, ?)
""", (template_name, subject, body_content, campaign_id))
template_id = cursor.lastrowid
conn.commit()
conn.close()
return template_id
def create_session():
session = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
# Define the function to handle real-time logging
def update_logs(logs, log_message, log_container):
logs.append(log_message)
log_container.text_area("Search Logs", "\n".join(logs), height=300)
def log_search_term_effectiveness(term, total_results, valid_leads, irrelevant_leads, blogs_found, directories_found):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO search_term_effectiveness
(term, total_results, valid_leads, irrelevant_leads, blogs_found, directories_found)
VALUES (?, ?, ?, ?, ?, ?)
""", (term, total_results, valid_leads, irrelevant_leads, blogs_found, directories_found))
conn.commit()
conn.close()
def should_scrape_page(meta_description, title):
# No premature filtering. We process even if it's a blog or directory
return True # We'll always scrape, but categorize as 'blog' or 'directory'
def categorize_page_content(soup, url):
title = soup.title.string if soup.title else ''
meta_description = soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else ''
if 'blog' in title.lower() or 'blog' in meta_description.lower():
return 'blog'
elif 'directory' in title.lower() or 'directory' in meta_description.lower():
return 'directory'
else:
return 'company'
def manual_search_wrapper(term, num_results, campaign_id, search_type="All Leads", log_container=None):
results = []
logs = []
ua = UserAgent()
session = create_session()
try:
log_message = f"Starting search for term: {term}"
if log_container:
update_logs(logs, log_message, log_container)
search_urls = list(search(term, num=num_results, stop=num_results, pause=2))
log_message = f"Found {len(search_urls)} URLs"
if log_container:
update_logs(logs, log_message, log_container)
term_id = add_search_term(term, campaign_id)
if term_id is None:
raise ValueError(f"Failed to add or retrieve search term ID for '{term}'")
for url in search_urls:
try:
log_message = f"Processing URL: {url}"
if log_container:
update_logs(logs, log_message, log_container)
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
response = session.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
emails = find_emails(response.text)
phone_numbers = extract_phone_numbers(response.text)
log_message = f"Found {len(emails)} emails and {len(phone_numbers)} phone numbers on {url}"
if log_container:
update_logs(logs, log_message, log_container)
for email in emails:
if is_valid_email(email):
lead_id = save_lead(email, None, None, None, None, None)
save_lead_source(lead_id, term_id, url, soup.title.string if soup.title else None,
soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else None,
response.status_code, str(response.elapsed), soup.find_all('meta'), extract_phone_numbers(response.text), soup.get_text(), [])
log_message = f"Valid email found: {email}"
if log_container:
update_logs(logs, log_message, log_container)
results.append([email, url, term])
time.sleep(random.uniform(1, 3)) # Random delay between requests
except requests.exceptions.RequestException as e:
log_message = f"Error processing {url}: {e}"
if log_container:
update_logs(logs, log_message, log_container)
except Exception as e:
log_message = f"Unexpected error processing {url}: {e}"
if log_container:
update_logs(logs, log_message, log_container)
if len(results) >= num_results:
break
# Update the processed_leads count for this search term
update_processed_leads_count(term_id, len(results))
time.sleep(random.uniform(30, 60)) # Longer delay between search terms
except Exception as e:
log_message = f"Error in manual search: {e}"
if log_container:
update_logs(logs, log_message, log_container)
log_message = f"Search completed. Found {len(results)} results."
if log_container:
update_logs(logs, log_message, log_container)
return results[:num_results]
# Function to add a new search term
def add_search_term(term, campaign_id):
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if the term already exists for this campaign
cursor.execute("SELECT id FROM search_terms WHERE term = ? AND campaign_id = ?", (term, campaign_id))
existing_term = cursor.fetchone()
if existing_term:
term_id = existing_term['id']
logging.info(f"Search term '{term}' already exists for campaign {campaign_id} with ID: {term_id}")
else:
cursor.execute('''
INSERT INTO search_terms (term, campaign_id, processed_leads)
VALUES (?, ?, 0)
''', (term, campaign_id))
term_id = cursor.lastrowid
logging.info(f"New search term '{term}' added for campaign {campaign_id} with ID: {term_id}")
conn.commit()
return term_id
except sqlite3.Error as e:
logging.error(f"Database error in add_search_term: {e}")
return None
finally:
if conn:
conn.close()
def manual_search_wrapper(term, num_results, campaign_id, search_type="All Leads"):
results = []
ua = UserAgent()
session = create_session()
try:
print(f"Starting search for term: {term}")
search_urls = list(search(term, num=num_results, stop=num_results, pause=2))
print(f"Found {len(search_urls)} URLs")
# Add the search term to the database and get its ID
term_id = add_search_term(term, campaign_id)
if term_id is None:
raise ValueError(f"Failed to add or retrieve search term ID for '{term}'")
for url in search_urls:
try:
print(f"Processing URL: {url}")
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
response = session.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
emails = find_emails(response.text)
for email in emails:
if is_valid_email(email):
lead_id = save_lead(email, None, None, None, None, None)
save_lead_source(lead_id, term_id, url, soup.title.string if soup.title else None,
soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else None,
response.status_code, str(response.elapsed), soup.find_all('meta'), extract_phone_numbers(response.text), soup.get_text(), [])
results.append([email, url, term])
time.sleep(random.uniform(1, 3)) # Random delay between requests
except requests.exceptions.RequestException as e:
print(f"Error processing {url}: {e}")
except Exception as e:
print(f"Unexpected error processing {url}: {e}")
if len(results) >= num_results:
break
# Update the processed_leads count for this search term
update_processed_leads_count(term_id, len(results))
time.sleep(random.uniform(30, 60)) # Longer delay between search terms
except Exception as e:
print(f"Error in manual search: {e}")
print(f"Search completed. Found {len(results)} results.")
return results[:num_results]
def update_processed_leads_count(term_id, count):
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
UPDATE search_terms
SET processed_leads = processed_leads + ?
WHERE id = ?
''', (count, term_id))
conn.commit()
except sqlite3.Error as e:
logging.error(f"Database error in update_processed_leads_count: {e}")
finally:
if conn:
conn.close()
def update_processed_leads_count(term_id, count):
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
UPDATE search_terms
SET processed_leads = processed_leads + ?
WHERE id = ?
''', (count, term_id))
conn.commit()
except sqlite3.Error as e:
logging.error(f"Database error in update_processed_leads_count: {e}")
finally:
if conn:
conn.close()
# Function to fetch search terms
def fetch_search_terms(campaign_id=None):
conn = get_db_connection()
cursor = conn.cursor()
if campaign_id:
campaign_id = validate_id(campaign_id, "campaign")
query = '''
SELECT st.id, st.term, COUNT(DISTINCT ls.lead_id) as processed_leads, st.status
FROM search_terms st
LEFT JOIN lead_sources ls ON st.id = ls.search_term_id
WHERE st.campaign_id = ?
GROUP BY st.id
'''
cursor.execute(query, (campaign_id,))
else:
query = '''
SELECT st.id, st.term, COUNT(DISTINCT ls.lead_id) as processed_leads, st.status
FROM search_terms st
LEFT JOIN lead_sources ls ON st.id = ls.search_term_id
GROUP BY st.id
'''
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
return pd.DataFrame(rows, columns=["ID", "Search Term", "Leads Fetched", "Status"])
# Function to update search term status
def update_search_term_status(search_term_id, new_status, processed_leads=None):
search_term_id = validate_id(search_term_id, "search term")
new_status = validate_status(new_status, ["pending", "completed"])
conn = get_db_connection()
cursor = conn.cursor()
if processed_leads is not None:
processed_leads = validate_num_results(processed_leads)
cursor.execute("""
UPDATE search_terms
SET status = ?, processed_leads = ?, last_processed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, processed_leads, search_term_id))
else:
cursor.execute("""
UPDATE search_terms
SET status = ?, last_processed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, search_term_id))
conn.commit()
conn.close()
# Update the save_lead function
def save_lead(email, phone, first_name, last_name, company, job_title):
email = validate_email(email)
conn = get_db_connection()
cursor = conn.cursor()
# Check if the email already exists
cursor.execute("SELECT id FROM leads WHERE email = ?", (email,))
existing_lead = cursor.fetchone()
if existing_lead:
lead_id = existing_lead['id']
logging.info(f"Existing lead found for email {email} with ID {lead_id}")
# Update existing lead information if provided
if any([phone, first_name, last_name, company, job_title]):
cursor.execute("""
UPDATE leads
SET phone = COALESCE(?, phone),
first_name = COALESCE(?, first_name),
last_name = COALESCE(?, last_name),
company = COALESCE(?, company),
job_title = COALESCE(?, job_title)
WHERE id = ?
""", (phone, first_name, last_name, company, job_title, lead_id))
logging.info(f"Updated existing lead with ID {lead_id}")
else:
cursor.execute("""
INSERT INTO leads (email, phone, first_name, last_name, company, job_title)
VALUES (?, ?, ?, ?, ?, ?)
""", (email, phone, first_name, last_name, company, job_title))
lead_id = cursor.lastrowid
logging.info(f"New lead created for email {email} with ID {lead_id}")
conn.commit()
conn.close()
return lead_id
def save_lead_source(lead_id, term_id, url, page_title, meta_description, http_status, scrape_duration, meta_tags, phone_numbers, content, tags):
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO lead_sources (lead_id, search_term_id, url, page_title, meta_description, http_status, scrape_duration, meta_tags, phone_numbers, content, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (lead_id, term_id, url, page_title, meta_description, http_status, scrape_duration, json.dumps(meta_tags), json.dumps(phone_numbers), content, json.dumps(tags)))
conn.commit()
except sqlite3.Error as e:
logging.error(f"Database error in save_lead_source: {e}")
finally:
if conn:
conn.close()
# Function to add a lead to a campaign
def add_lead_to_campaign(campaign_id, lead_id):
campaign_id = validate_id(campaign_id, "campaign")
lead_id = validate_id(lead_id, "lead")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO campaign_leads (campaign_id, lead_id) VALUES (?, ?)",
(campaign_id, lead_id))
conn.commit()
conn.close()
# Function to create a new message
def create_message(campaign_id, lead_id, template_id, customized_subject, customized_content):
campaign_id = validate_id(campaign_id, "campaign")
lead_id = validate_id(lead_id, "lead")
template_id = validate_id(template_id, "template")
customized_subject = validate_name(customized_subject)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO messages (campaign_id, lead_id, template_id, customized_subject, customized_content)
VALUES (?, ?, ?, ?, ?)
""", (campaign_id, lead_id, template_id, customized_subject, customized_content))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return message_id
# Function to update message status
def update_message_status(message_id, status, sent_at=None):
message_id = validate_id(message_id, "message")
status = validate_status(status, ["pending", "sent", "failed"])
conn = get_db_connection()
cursor = conn.cursor()
if sent_at:
cursor.execute("UPDATE messages SET status = ?, sent_at = ? WHERE id = ?",
(status, sent_at, message_id))
else:
cursor.execute("UPDATE messages SET status = ? WHERE id = ?",
(status, message_id))
conn.commit()
conn.close()
# Function to fetch message templates
def fetch_message_templates(campaign_id=None):
conn = get_db_connection()
cursor = conn.cursor()
if campaign_id:
campaign_id = validate_id(campaign_id, "campaign")
cursor.execute('SELECT id, template_name FROM message_templates WHERE campaign_id = ?', (campaign_id,))
else:
cursor.execute('SELECT id, template_name FROM message_templates')
rows = cursor.fetchall()
conn.close()
return [f"{row['id']}: {row['template_name']}" for row in rows]
# Function to fetch projects
def fetch_projects():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, project_name FROM projects')
rows = cursor.fetchall()
conn.close()
return [f"{row['id']}: {row['project_name']}" for row in rows]
# Function to fetch campaigns
def fetch_campaigns():
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, campaign_name FROM campaigns ORDER BY campaign_name')
campaigns = cursor.fetchall()
return [f"{campaign['id']}: {campaign['campaign_name']}" for campaign in campaigns]
except sqlite3.Error as e:
logging.error(f"Database error in fetch_campaigns: {e}")
return []
finally:
if conn:
conn.close()
# Updated bulk search function
async def bulk_search(num_results):
total_leads = 0
all_results = []
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, term FROM search_terms')
search_terms = cursor.fetchall()
conn.close()
logs = []
for term_id, term in search_terms:
leads_found = 0
try:
search_urls = list(search(term['term'], num=num_results, stop=num_results, pause=2))
for url in search_urls:
if leads_found >= num_results:
break
try:
response = session.get(url, timeout=10)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
emails = find_emails(response.text)
for email in emails:
if is_valid_email(email):
lead_id = save_lead(email, None, None, None, None, None)
save_lead_source(lead_id, term_id, url, soup.title.string if soup.title else None,
soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else None,
response.status_code, str(response.elapsed), soup.find_all('meta'), extract_phone_numbers(response.text), soup.get_text(), [])
all_results.append([email, url, term['term']])
leads_found += 1
total_leads += 1
if leads_found >= num_results:
break
except Exception as e:
logging.error(f"Error processing {url}: {e}")
logs.append(f"Processed {leads_found} leads for term '{term['term']}'")
except Exception as e:
logging.error(f"Error performing search for term '{term['term']}': {e}")
update_search_term_status(term_id, 'completed', leads_found)
logs.append(f"Bulk search completed. Total new leads found: {total_leads}")
return logs, all_results
# Function to get email preview
def get_email_preview(template_id, from_email, reply_to):
template_id = validate_id(template_id, "template")
from_email = validate_email(from_email)
reply_to = validate_email(reply_to)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT subject, body_content FROM message_templates WHERE id = ?', (template_id,))
template = cursor.fetchone()
conn.close()
if template:
subject, body_content = template
preview = f"""
<h3>Email Preview</h3>
<strong>Subject:</strong> {subject}<br>
<strong>From:</strong> {from_email}<br>
<strong>Reply-To:</strong> {reply_to}<br>
<hr>
<h4>Body:</h4>
<iframe srcdoc="{body_content.replace('"', '&quot;')}" width="100%" height="600" style="border: 1px solid #ccc;"></iframe>
"""
return preview
else:
return "<p>Template not found</p>"
# Update the fetch_sent_messages function
def fetch_sent_messages():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT m.id, m.sent_at, l.email, mt.template_name, m.customized_subject, m.customized_content, m.status, m.message_id
FROM messages m
JOIN leads l ON m.lead_id = l.id
JOIN message_templates mt ON m.template_id = mt.id
ORDER BY m.sent_at DESC
''')
messages = cursor.fetchall()
conn.close()
return pd.DataFrame(messages, columns=['ID', 'Sent At', 'Email', 'Template', 'Subject', 'Content', 'Status', 'Message ID'])
# Update the view_sent_messages function
def view_sent_messages():
st.header("View Sent Messages")
if st.button("Refresh Sent Messages"):
st.session_state.sent_messages = fetch_sent_messages()
if 'sent_messages' not in st.session_state:
st.session_state.sent_messages = fetch_sent_messages()
# Display messages in a more organized manner
for _, row in st.session_state.sent_messages.iterrows():
with st.expander(f"Message to {row['Email']} - {row['Sent At']}"):
st.write(f"**Subject:** {row['Subject']}")
st.write(f"**Template:** {row['Template']}")
st.write(f"**Status:** {row['Status']}")
st.write(f"**Message ID:** {row['Message ID']}")
st.write("**Content:**")
st.markdown(row['Content'], unsafe_allow_html=True)
# Display summary statistics
st.subheader("Summary Statistics")
total_messages = len(st.session_state.sent_messages)
sent_messages = len(st.session_state.sent_messages[st.session_state.sent_messages['Status'] == 'sent'])
failed_messages = len(st.session_state.sent_messages[st.session_state.sent_messages['Status'] == 'failed'])
col1, col2, col3 = st.columns(3)
col1.metric("Total Messages", total_messages)
col2.metric("Sent Messages", sent_messages)
col3.metric("Failed Messages", failed_messages)
# Update the bulk_send function to be a coroutine instead of an async generator
async def bulk_send(template_id, from_email, reply_to, leads):
template_id = validate_id(template_id, "template")
from_email = validate_email(from_email)
reply_to = validate_email(reply_to)
conn = get_db_connection()
cursor = conn.cursor()
# Fetch the template
cursor.execute('SELECT template_name, subject, body_content FROM message_templates WHERE id = ?', (template_id,))
template = cursor.fetchone()
if not template:
conn.close()
return "Template not found"
template_name, subject, body_content = template
total_leads = len(leads)
logs = [
f"Preparing to send emails to {total_leads} leads",
f"Template Name: {template_name}",
f"Template ID: {template_id}",
f"Subject: {subject}",
f"From: {from_email}",
f"Reply-To: {reply_to}",
"---"
]
if total_leads == 0:
logs.append("No leads found to send emails to. Please check if there are leads in the database and if they have already been sent this template.")
return logs
total_sent = 0
for index, (lead_id, email) in enumerate(leads, 1):
try:
response = ses_client.send_email(
Source=from_email,
Destination={
'ToAddresses': [email],
},
Message={
'Subject': {
'Data': subject,
'Charset': 'UTF-8'
},
'Body': {
'Html': {
'Data': body_content,
'Charset': 'UTF-8'
}
}
},
ReplyToAddresses=[reply_to]
)
message_id = response['MessageId']
save_message(lead_id, template_id, 'sent', datetime.now(), subject, message_id, body_content)
total_sent += 1
logs.append(f"[{index}/{total_leads}] Sent email to {email} - Subject: {subject} - MessageId: {message_id}")
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
logging.error(f"Failed to send email to {email}: {error_code} - {error_message}")
save_message(lead_id, template_id, 'failed', None, subject)
logs.append(f"[{index}/{total_leads}] Failed to send email to {email}: {error_code} - {error_message}")
except Exception as e:
logging.error(f"Unexpected error sending email to {email}: {str(e)}")
save_message(lead_id, template_id, 'failed', None, subject)
logs.append(f"[{index}/{total_leads}] Unexpected error sending email to {email}: {str(e)}")
await asyncio.sleep(0.1) # Small delay to allow UI updates
logs.append("---")
logs.append(f"Bulk send completed. Total emails sent: {total_sent}/{total_leads}")
return logs
# Update the bulk_send_page function
def bulk_send_page():
st.header("Bulk Send")
templates = fetch_message_templates()
if not templates:
st.warning("No message templates found. Please create a template first.")
return
with st.form(key="bulk_send_form"):
template_id = st.selectbox("Select Message Template", options=templates)
from_email = st.text_input("From Email", value="Sami Halawa <[email protected]>")
reply_to = st.text_input("Reply To", value="[email protected]")
send_option = st.radio("Send to:",
["All Leads",
"All Not Contacted with this Template",
"All Not Contacted with Templates from this Campaign"])
filter_option = st.radio("Filter:",
["Not Filter Out Leads",
"Filter Out blog-directory"])
col1, col2 = st.columns(2)
with col1:
preview_button = st.form_submit_button(label="Preview Email")
with col2:
send_button = st.form_submit_button(label="Start Bulk Send")
if preview_button:
preview = get_email_preview(template_id.split(":")[0], from_email, reply_to)
st.components.v1.html(preview, height=600, scrolling=True)
if send_button:
st.session_state.bulk_send_started = True
st.session_state.bulk_send_logs = []
st.session_state.bulk_send_progress = 0
# Fetch leads based on send_option and filter_option
leads_to_send = fetch_leads_for_bulk_send(template_id.split(":")[0], send_option, filter_option)
st.write(f"Preparing to send emails to {len(leads_to_send)} leads")
# Perform bulk send
bulk_send_coroutine = bulk_send(template_id.split(":")[0], from_email, reply_to, leads_to_send)
logs = asyncio.run(bulk_send_coroutine)
# Display logs and statistics
for log in logs:
st.write(log)
st.success(f"Bulk send completed. Sent {len(leads_to_send)} emails.")
def fetch_leads_for_bulk_send(template_id, send_option, filter_option):
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT l.id, l.email
FROM leads l
JOIN lead_sources ls ON l.id = ls.lead_id
"""
if send_option == "All Not Contacted with this Template":
query += f"""
LEFT JOIN messages m ON l.id = m.lead_id AND m.template_id = {template_id}
WHERE m.id IS NULL
"""
elif send_option == "All Not Contacted with Templates from this Campaign":
query += f"""
LEFT JOIN messages m ON l.id = m.lead_id
LEFT JOIN message_templates mt ON m.template_id = mt.id
WHERE m.id IS NULL OR mt.campaign_id != (SELECT campaign_id FROM message_templates WHERE id = {template_id})
"""
if filter_option == "Filter Out blog-directory":
query += " AND NOT ls.tags LIKE '%blog-directory%'"
cursor.execute(query)
leads = cursor.fetchall()
conn.close()
return leads
# Update the save_message function to include the subject
def save_message(lead_id, template_id, status, sent_at=None, subject=None, message_id=None, customized_content=None):
conn = get_db_connection()
cursor = conn.cursor()
if sent_at:
cursor.execute("""
INSERT INTO messages (lead_id, template_id, status, sent_at, customized_subject, message_id, customized_content)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (lead_id, template_id, status, sent_at, subject, message_id, customized_content))
else:
cursor.execute("""
INSERT INTO messages (lead_id, template_id, status, customized_subject, message_id, customized_content)
VALUES (?, ?, ?, ?, ?, ?)
""", (lead_id, template_id, status, subject, message_id, customized_content))
conn.commit()
conn.close()
# Function to sanitize HTML content
def sanitize_html(content):
return re.sub('<[^<]+?>', '', content)
# Function to find valid emails in HTML text
def find_emails(html_text):
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[^@]+\.[A-Z|a-z]{2,7}\b')
all_emails = set(email_regex.findall(html_text))
valid_emails = {email for email in all_emails if is_valid_email(email)}
unique_emails = {}
for email in valid_emails:
domain = email.split('@')[1]
if domain not in unique_emails:
unique_emails[domain] = email
return set(unique_emails.values())
# Function to validate email address
def is_valid_email(email):
invalid_patterns = [
r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@',
r'^prueba@', r'^\d+[a-z]*@'
]
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"]
if len(email) < 6 or len(email) > 254:
return False
for pattern in invalid_patterns:
if re.search(pattern, email, re.IGNORECASE):
return False
domain = email.split('@')[1]
if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain):
return False
return True
# Function to refresh search terms
def refresh_search_terms(campaign_id):
return df_to_list(fetch_search_terms(campaign_id))
# Function to convert DataFrame to list of lists
def df_to_list(df):
return df.values.tolist()
# Add this function before the Gradio interface definition
import re
from urllib.parse import urlparse
def extract_phone_numbers(text):
phone_pattern = re.compile(r'\b(?:\+?34)?[\s.-]?[6789]\d{2}[\s.-]?\d{3}[\s.-]?\d{3}\b')
return phone_pattern.findall(text)
def is_probable_blog_or_directory(soup, email):
# Check if the email is inside an <article> or <main> tag
article_content = soup.find('article')
main_content = soup.find('main')
if article_content and email in article_content.get_text():
return True
if main_content and email in main_content.get_text():
return True
# Check if there are multiple emails on the page
all_emails = find_emails(soup.get_text())
if len(all_emails) > 3: # Arbitrary threshold, adjust as needed
return True
# Check for common blog/directory indicators in the URL or title
url = soup.find('meta', property='og:url')
url = url['content'] if url else ''
title = soup.title.string if soup.title else ''
indicators = ['blog', 'article', 'news', 'directory', 'list', 'index']
if any(indicator in url.lower() or indicator in title.lower() for indicator in indicators):
return True
return False
def extract_visible_text(soup):
# kill all script and style elements
for script in soup(["script", "style"]):
script.decompose() # rip it out
# get text
text = soup.get_text()
# break into lines and remove leading and trailing space on each
lines = (line.strip() for line in text.splitlines())
# break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def manual_search_wrapper(term, num_results, campaign_id, search_type="All Leads"):
results = []
try:
print(f"Starting search for term: {term}")
search_urls = list(search(term, num=num_results, stop=num_results, pause=2))
print(f"Found {len(search_urls)} URLs")
for url in search_urls:
try:
print(f"Processing URL: {url}")
response = session.get(url, timeout=10)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else ""
meta_description = soup.find('meta', attrs={'name': 'description'})
meta_description = meta_description['content'] if meta_description else ""
emails = find_emails(response.text)
phone_numbers = extract_phone_numbers(response.text)
meta_tags = [str(tag) for tag in soup.find_all('meta')]
content = extract_visible_text(soup)
print(f"Found {len(emails)} emails and {len(phone_numbers)} phone numbers on {url}")
for email in emails:
if is_valid_email(email):
tags = []
if is_probable_blog_or_directory(soup, email):
tags.append("blog-directory")
else:
tags.append("company")
if search_type == "All Leads" or (search_type == "Exclude Probable Blogs/Directories" and "blog-directory" not in tags):
lead_id = save_lead(email, phone_numbers[0] if phone_numbers else None, None, None, None, None)
save_lead_source(lead_id, campaign_id, url, title, meta_description, response.status_code,
str(response.elapsed), meta_tags, phone_numbers, content, tags)
results.append([email, url, title, meta_description, tags])
print(f"Valid email found: {email}")
if len(results) >= num_results:
break
except Exception as e:
logging.error(f"Error processing {url}: {e}")
print(f"Error processing {url}: {e}")
except Exception as e:
logging.error(f"Error in manual search: {e}")
print(f"Error in manual search: {e}")
print(f"Search completed. Found {len(results)} results.")
return results[:num_results]
# Update save_lead function to be more resilient
def save_lead(email, phone, first_name, last_name, company, job_title):
try:
email = validate_email(email)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM leads WHERE email = ?", (email,))
existing_lead = cursor.fetchone()
if existing_lead:
lead_id = existing_lead['id']
print(f"Existing lead found for email {email} with ID {lead_id}")
cursor.execute("""
UPDATE leads
SET phone = COALESCE(?, phone),
first_name = COALESCE(?, first_name),
last_name = COALESCE(?, last_name),
company = COALESCE(?, company),
job_title = COALESCE(?, job_title)
WHERE id = ?
""", (phone, first_name, last_name, company, job_title, lead_id))
print(f"Updated existing lead with ID {lead_id}")
else:
cursor.execute("""
INSERT INTO leads (email, phone, first_name, last_name, company, job_title)
VALUES (?, ?, ?, ?, ?, ?)
""", (email, phone, first_name, last_name, company, job_title))
lead_id = cursor.lastrowid
print(f"New lead created for email {email} with ID {lead_id}")
conn.commit()
conn.close()
return lead_id
except Exception as e:
print(f"Error saving lead: {e}")
return None
# Update save_lead_source function to be more resilient
def save_lead_source(lead_id, search_term_id, url, page_title, meta_description, http_status, scrape_duration, meta_tags, phone_numbers, content, tags):
try:
if lead_id is None:
print("Cannot save lead source: lead_id is None")
return
lead_id = validate_id(lead_id, "lead")
search_term_id = validate_id(search_term_id, "search term")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO lead_sources (lead_id, search_term_id, url, page_title, meta_description, http_status, scrape_duration, meta_tags, phone_numbers, content, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (lead_id, search_term_id, url, page_title, meta_description, http_status, scrape_duration, json.dumps(meta_tags), json.dumps(phone_numbers), content, json.dumps(tags)))
conn.commit()
conn.close()
print(f"Lead source saved for lead ID {lead_id}")
except Exception as e:
print(f"Error saving lead source: {e}")
# Update the update_search_term_status function to handle term as string
def update_search_term_status(search_term_id, new_status, processed_leads=None):
new_status = validate_status(new_status, ["pending", "completed"])
conn = get_db_connection()
cursor = conn.cursor()
if processed_leads is not None:
processed_leads = validate_num_results(processed_leads)
cursor.execute("""
UPDATE search_terms
SET status = ?, processed_leads = ?, last_processed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, processed_leads, search_term_id))
else:
cursor.execute("""
UPDATE search_terms
SET status = ?, last_processed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, search_term_id))
conn.commit()
conn.close()
# Update the manual_search_wrapper function
def manual_search_wrapper(term, num_results, campaign_id, search_type="All Leads"):
results = []
try:
print(f"Starting search for term: {term}")
search_urls = list(search(term, num=num_results, stop=num_results, pause=2))
print(f"Found {len(search_urls)} URLs")
for url in search_urls:
try:
print(f"Processing URL: {url}")
response = session.get(url, timeout=10)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else ""
meta_description = soup.find('meta', attrs={'name': 'description'})
meta_description = meta_description['content'] if meta_description else ""
emails = find_emails(response.text)
phone_numbers = extract_phone_numbers(response.text)
meta_tags = [str(tag) for tag in soup.find_all('meta')]
content = extract_visible_text(soup)
print(f"Found {len(emails)} emails and {len(phone_numbers)} phone numbers on {url}")
for email in emails:
if is_valid_email(email):
tags = []
if is_probable_blog_or_directory(soup, email):
tags.append("blog-directory")
else:
tags.append("company")
if search_type == "All Leads" or (search_type == "Exclude Probable Blogs/Directories" and "blog-directory" not in tags):
lead_id = save_lead(email, phone_numbers[0] if phone_numbers else None, None, None, None, None)
save_lead_source(lead_id, campaign_id, url, title, meta_description, response.status_code,
str(response.elapsed), meta_tags, phone_numbers, content, tags)
results.append([email, url, title, meta_description, tags])
print(f"Valid email found: {email}")
if len(results) >= num_results:
break
except Exception as e:
logging.error(f"Error processing {url}: {e}")
print(f"Error processing {url}: {e}")
except Exception as e:
logging.error(f"Error in manual search: {e}")
print(f"Error in manual search: {e}")
print(f"Search completed. Found {len(results)} results.")
return results[:num_results]
# Function to fetch leads
def fetch_leads():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT l.id, l.email, l.phone, l.first_name, l.last_name, l.company, l.job_title,
st.term as search_term, ls.url as source_url, ls.page_title, ls.meta_description,
ls.phone_numbers, ls.content, ls.tags
FROM leads l
JOIN lead_sources ls ON l.id = ls.lead_id
JOIN search_terms st ON ls.search_term_id = st.id
ORDER BY l.id DESC
''')
rows = cursor.fetchall()
conn.close()
return pd.DataFrame(rows, columns=["ID", "Email", "Phone", "First Name", "Last Name", "Company", "Job Title",
"Search Term", "Source URL", "Page Title", "Meta Description",
"Phone Numbers", "Content", "Tags"])
# Add this function to fetch search terms for a specific campaign
def fetch_search_terms_for_campaign(campaign_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, term FROM search_terms WHERE campaign_id = ?', (campaign_id,))
terms = cursor.fetchall()
conn.close()
return [{"name": f"{term['id']}: {term['term']}", "value": str(term['id'])} for term in terms]
# Update the manual_search_multiple function
def manual_search_multiple(terms, num_results, campaign_id):
all_results = []
for term in terms:
if term.strip(): # Only process non-empty terms
results = manual_search_wrapper(term.strip(), num_results, campaign_id)
all_results.extend(results)
return all_results
# New function to get least searched terms
def get_least_searched_terms(n):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT term
FROM search_terms
ORDER BY processed_leads ASC
LIMIT ?
''', (n,))
terms = [row[0] for row in cursor.fetchall()]
conn.close()
return terms
# Streamlit app
def get_knowledge_base(project_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM knowledge_base WHERE project_id = ?", (project_id,))
knowledge = cursor.fetchone()
conn.close()
return knowledge
def update_knowledge_base(project_id, data):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO knowledge_base (
project_id, name, bio, values, contact_name, contact_role, contact_email,
company_description, company_mission, company_target_market, company_other,
product_name, product_description, product_target_customer, product_other,
other_context, example_email, complete_version, medium_version, small_version,
temporal_version, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (project_id, *data))
conn.commit()
conn.close()
def knowledge_base_view():
st.header("Knowledge Base")
projects = fetch_projects()
selected_project = st.selectbox("Select Project", options=projects)
project_id = int(selected_project.split(":")[0])
knowledge = get_knowledge_base(project_id)
with st.form("knowledge_base_form"):
col1, col2 = st.columns(2)
with col1:
name = st.text_input("Name", value=knowledge[2] if knowledge else "")
bio = st.text_area("Bio", value=knowledge[3] if knowledge else "")
values = st.text_area("Values", value=knowledge[4] if knowledge else "")
with col2:
contact_name = st.text_input("Contact Name", value=knowledge[5] if knowledge else "")
contact_role = st.text_input("Contact Role", value=knowledge[6] if knowledge else "")
contact_email = st.text_input("Contact Email", value=knowledge[7] if knowledge else "")
st.subheader("Company Information")
company_description = st.text_area("Company Description", value=knowledge[8] if knowledge else "")
company_mission = st.text_area("Company Mission/Vision", value=knowledge[9] if knowledge else "")
company_target_market = st.text_area("Company Target Market", value=knowledge[10] if knowledge else "")
company_other = st.text_area("Company Other", value=knowledge[11] if knowledge else "")
st.subheader("Product Information")
product_name = st.text_input("Product Name", value=knowledge[12] if knowledge else "")
product_description = st.text_area("Product Description", value=knowledge[13] if knowledge else "")
product_target_customer = st.text_area("Product Target Customer", value=knowledge[14] if knowledge else "")
product_other = st.text_area("Product Other", value=knowledge[15] if knowledge else "")
other_context = st.text_area("Other Context", value=knowledge[16] if knowledge else "")
example_email = st.text_area("Example Reference Email", value=knowledge[17] if knowledge else "")
submit_button = st.form_submit_button("Save Knowledge Base")
if submit_button:
data = (
name, bio, values, contact_name, contact_role, contact_email,
company_description, company_mission, company_target_market, company_other,
product_name, product_description, product_target_customer, product_other,
other_context, example_email, "", "", "", ""
)
update_knowledge_base(project_id, data)
st.success("Knowledge base updated successfully!")
if knowledge:
st.subheader("AI Context Versions")
st.text_area("Complete Version", value=knowledge[18], height=200, disabled=True)
st.text_area("Medium Version", value=knowledge[19], height=150, disabled=True)
st.text_area("Small Version", value=knowledge[20], height=100, disabled=True)
st.text_area("Temporal Version", value=knowledge[21], height=100, disabled=True)
# Update the main function to include the new knowledge_base_view
from openai import OpenAI
# Initialize the OpenAI client
client = OpenAI(api_key="sk-1234")
client.base_url = "https://openai-proxy-kl3l.onrender.com"
import os
from openai import OpenAI
# Initialize the OpenAI client
client = OpenAI(
api_key="sk-1234",
base_url="https://openai-proxy-kl3l.onrender.com"
)
# The model to use
openai_model = "gpt-3.5-turbo"
from openai import OpenAI
import numpy as np
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
# Initialize the OpenAI client
client = OpenAI(
api_key="sk-1234",
base_url="https://openai-proxy-kl3l.onrender.com"
)
# At the beginning of your script, add:
ai_request_logs = []
def autoclient_ai_view():
st.header("AutoclientAI")
# Display condensed knowledge base
kb_info = get_knowledge_base_info()
st.subheader("Knowledge Base Summary")
st.json(kb_info)
# Create tabs
tab1, tab2, tab3, tab4 = st.tabs(["Optimize Existing Groups", "Create New Groups", "Adjust Email Templates", "Optimize Search Terms"])
with tab1:
optimize_existing_groups()
with tab2:
create_new_groups()
with tab3:
adjust_email_template()
with tab4:
optimize_search_terms()
# Display AI request logs
if ai_request_logs:
st.subheader("AI Request Logs")
for log in ai_request_logs:
st.text(log)
else:
st.info("No AI request logs available.")
# Make sure to add this function if it doesn't exist:
# Make sure to add this function if it doesn't exist:
def log_ai_request(request_type, prompt, response):
global ai_request_logs
log_entry = f"{datetime.now()}: {request_type}\nPrompt: {prompt}\nResponse: {response}"
ai_request_logs.append(log_entry)
logging.info(log_entry)
def optimize_existing_groups():
groups = fetch_search_term_groups()
if not groups:
st.warning("No search term groups found.")
return
selected_group = st.selectbox("Select a group to optimize", options=groups)
if st.button("Optimize Selected Group"):
with st.spinner("Optimizing group..."):
group_data = get_group_data(selected_group)
kb_info = get_knowledge_base_info()
optimized_data = classify_search_terms(group_data['search_terms'], kb_info)
st.subheader("Optimized Search Terms")
for category, terms in optimized_data.items():
st.write(f"**{category}:**")
new_terms = st.text_area(f"Edit terms for {category}:", value="\n".join(terms))
optimized_data[category] = new_terms.split("\n")
if st.button("Save Optimized Group"):
save_optimized_group(selected_group, optimized_data)
st.success("Group optimized and saved successfully!")
def create_new_groups():
all_terms = fetch_all_search_terms()
if st.button("Create New Groups"):
with st.spinner("Creating new groups..."):
kb_info = get_knowledge_base_info()
classified_terms = classify_search_terms(all_terms, kb_info)
for category, terms in classified_terms.items():
if category != "low_quality_search_terms":
st.subheader(f"New Group: {category}")
st.write("Search Terms:", ", ".join(terms))
email_template = generate_email_template(terms, kb_info)
st.text_area("Email Template", value=email_template, height=200)
if st.button(f"Save Group {category}"):
save_new_group(category, terms, email_template)
st.success(f"Group {category} saved successfully!")
else:
st.subheader("Low Quality Search Terms")
st.write(", ".join(terms))
def adjust_email_template():
groups = fetch_search_term_groups()
if not groups:
st.warning("No search term groups found.")
return
selected_group = st.selectbox("Select a group to adjust email template", options=groups)
if st.button("Fetch Current Template"):
group_data = get_group_data(selected_group)
current_template = group_data['email_template']
kb_info = get_knowledge_base_info()
st.text_area("Current Template", value=current_template, height=200)
adjustment_prompt = st.text_area("Enter adjustment instructions:")
if st.button("Adjust Template"):
adjusted_template = adjust_email_template_api(current_template, adjustment_prompt, kb_info)
st.text_area("Adjusted Template", value=adjusted_template, height=200)
if st.button("Save Adjusted Template"):
save_adjusted_template(selected_group, adjusted_template)
st.success("Adjusted template saved successfully!")
def optimize_search_terms():
st.subheader("Optimize Search Terms")
current_terms = st.text_area("Enter current search terms (one per line):")
if st.button("Optimize Search Terms"):
with st.spinner("Optimizing search terms..."):
kb_info = get_knowledge_base_info()
optimized_terms = generate_optimized_search_terms(current_terms.split('\n'), kb_info)
st.subheader("Optimized Search Terms")
st.write("\n".join(optimized_terms))
if st.button("Save Optimized Terms"):
save_optimized_search_terms(optimized_terms)
st.success("Optimized search terms saved successfully!")
def classify_search_terms(search_terms, kb_info):
prompt = f"""
As an expert in lead generation and email marketing, classify the following search terms into strategic groups:
Search Terms: {', '.join(search_terms)}
Knowledge Base Info:
{kb_info}
Create groups that allow for tailored, personalized email content. Consider the product/service features, target audience, and potential customer pain points. Groups should be specific enough for customization but broad enough to be efficient. Always include a 'low_quality_search_terms' category for irrelevant or overly broad terms.
Respond with category names as keys and lists of search terms as values.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an AI assistant specializing in strategic search term classification for targeted email marketing campaigns."},
{"role": "user", "content": prompt}
]
)
log_ai_request("classify_search_terms", prompt, response.choices[0].message.content)
return eval(response.choices[0].message.content)
def generate_email_template(terms, kb_info):
prompt = f"""
Create an email template for the following search terms:
Search Terms: {', '.join(terms)}
Knowledge Base Info:
{kb_info}
Guidelines:
1. Focus on benefits to the reader
2. Address potential customer doubts and fears
3. Include clear CTAs at the beginning and end
4. Use a natural, conversational tone
5. Be concise but impactful
6. Use minimal formatting - remember this is an email, not a landing page
Provide only the email body content in HTML format, excluding <body> tags. Use <p>, <strong>, <em>, and <a> tags as needed.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an AI assistant specializing in creating high-converting email templates for targeted marketing campaigns."},
{"role": "user", "content": prompt}
]
)
log_ai_request("generate_email_template", prompt, response.choices[0].message.content)
return response.choices[0].message.content
def adjust_email_template_api(current_template, adjustment_prompt, kb_info):
prompt = f"""
Adjust the following email template based on the given instructions:
Current Template:
{current_template}
Adjustment Instructions:
{adjustment_prompt}
Knowledge Base Info:
{kb_info}
Guidelines:
1. Maintain focus on conversion and avoiding spam filters
2. Preserve the natural, conversational tone
3. Ensure benefits to the reader remain highlighted
4. Continue addressing potential customer doubts and fears
5. Keep clear CTAs at the beginning and end
6. Remain concise and impactful
7. Maintain minimal formatting suitable for an email
Provide only the adjusted email body content in HTML format, excluding <body> tags.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an AI assistant specializing in refining high-converting email templates for targeted marketing campaigns."},
{"role": "user", "content": prompt}
]
)
log_ai_request("adjust_email_template_api", prompt, response.choices[0].message.content)
return response.choices[0].message.content
def generate_optimized_search_terms(current_terms, kb_info):
prompt = f"""
Optimize the following search terms for targeted email campaigns:
Current Terms: {', '.join(current_terms)}
Knowledge Base Info:
{kb_info}
Guidelines:
1. Focus on terms likely to attract high-quality leads
2. Consider product/service features, target audience, and customer pain points
3. Optimize for specificity and relevance
4. Think about how each term could lead to a compelling email strategy
5. Remove or improve low-quality or overly broad terms
6. Add new, highly relevant terms based on the knowledge base information
Provide a list of optimized search terms, aiming for quality over quantity.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an AI assistant specializing in optimizing search terms for targeted email marketing campaigns."},
{"role": "user", "content": prompt}
]
)
log_ai_request("generate_optimized_search_terms", prompt, response.choices[0].message.content)
return response.choices[0].message.content.split('\n')
def fetch_search_term_groups():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM search_term_groups")
groups = [f"{row['id']}:{row['name']}" for row in cursor.fetchall()]
conn.close()
return groups
def get_group_data(group_id):
if not group_id:
return None
conn = get_db_connection()
cursor = conn.cursor()
group_id = group_id.split(':')[0]
cursor.execute("SELECT name, description, email_template FROM search_term_groups WHERE id = ?", (group_id,))
group_data = dict(cursor.fetchone())
cursor.execute("SELECT term FROM search_terms WHERE group_id = ?", (group_id,))
group_data['search_terms'] = [row['term'] for row in cursor.fetchall()]
conn.close()
return group_data
def save_optimized_group(group_id, optimized_data):
conn = get_db_connection()
cursor = conn.cursor()
group_id = group_id.split(':')[0]
# Delete existing terms for this group
cursor.execute("DELETE FROM search_terms WHERE group_id = ?", (group_id,))
# Insert new terms
for category, terms in optimized_data.items():
for term in terms:
cursor.execute("INSERT INTO search_terms (term, group_id, category) VALUES (?, ?, ?)",
(term, group_id, category))
conn.commit()
conn.close()
def fetch_all_search_terms():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT term FROM search_terms")
terms = [row['term'] for row in cursor.fetchall()]
conn.close()
return terms
def save_new_group(category, terms, email_template):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO search_term_groups (name, email_template) VALUES (?, ?)", (category, email_template))
group_id = cursor.lastrowid
for term in terms:
cursor.execute("INSERT INTO search_terms (term, group_id, category) VALUES (?, ?, ?)",
(term, group_id, category))
conn.commit()
conn.close()
def save_adjusted_template(group_id, adjusted_template):
conn = get_db_connection()
cursor = conn.cursor()
group_id = group_id.split(':')[0]
cursor.execute("UPDATE search_term_groups SET email_template = ? WHERE id = ?",
(adjusted_template, group_id))
conn.commit()
conn.close()
def get_knowledge_base_info():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT kb_name, kb_bio, kb_values, contact_name, contact_role, contact_email,
company_description, company_mission, company_target_market, company_other,
product_name, product_description, product_target_customer, product_other,
other_context, example_email
FROM knowledge_base
ORDER BY id DESC LIMIT 1
""")
result = cursor.fetchone()
conn.close()
if result:
return {
"kb_name": result[0],
"kb_bio": result[1],
"kb_values": result[2],
"contact_name": result[3],
"contact_role": result[4],
"contact_email": result[5],
"company_description": result[6],
"company_mission": result[7],
"company_target_market": result[8],
"company_other": result[9],
"product_name": result[10],
"product_description": result[11],
"product_target_customer": result[12],
"product_other": result[13],
"other_context": result[14],
"example_email": result[15]
}
else:
return {}
def save_optimized_search_terms(optimized_terms):
conn = get_db_connection()
cursor = conn.cursor()
for term in optimized_terms:
cursor.execute("INSERT INTO search_terms (term) VALUES (?)", (term,))
conn.commit()
conn.close()
def get_last_n_search_terms(n):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT term FROM search_terms ORDER BY id DESC LIMIT {n}")
terms = [row[0] for row in cursor.fetchall()]
conn.close()
return terms
def get_random_leads(n, from_last):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT l.email, st.term
FROM leads l
JOIN lead_sources ls ON l.id = ls.lead_id
JOIN search_terms st ON ls.search_term_id = st.id
ORDER BY l.id DESC LIMIT {from_last}
""")
leads = cursor.fetchall()
conn.close()
return random.sample(leads, min(n, len(leads)))
def format_leads_for_prompt(leads):
return "\n".join([f"{lead[0]} - {lead[1]}" for lead in leads])
def save_new_search_terms(terms):
conn = get_db_connection()
cursor = conn.cursor()
for term in terms:
cursor.execute("INSERT INTO search_terms (term) VALUES (?)", (term,))
conn.commit()
conn.close()
def save_new_email_template(template):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO message_templates (template_name, subject, body_content)
VALUES (?, ?, ?)
""", ("AI Generated Template", "AI Subject", template))
conn.commit()
conn.close()
def main():
st.title("AUTOCLIENT")
st.sidebar.title("Navigation")
pages = [
"Manual Search",
"Bulk Search",
"Bulk Send",
"View Leads",
"Search Terms",
"Message Templates",
"View Sent Messages",
"Projects & Campaigns",
"Knowledge Base",
"AutoclientAI",
"Automation Control"
]
# Control page selection
if 'current_page' not in st.session_state:
st.session_state.current_page = "Manual Search"
st.session_state.current_page = st.sidebar.radio("Go to", pages, index=pages.index(st.session_state.current_page))
# Call appropriate function based on the selected page
if st.session_state.current_page == "Manual Search":
manual_search_page()
elif st.session_state.current_page == "Bulk Search":
bulk_search_page()
elif st.session_state.current_page == "Bulk Send":
bulk_send_page()
elif st.session_state.current_page == "View Leads":
view_leads()
elif st.session_state.current_page == "Search Terms":
search_terms()
elif st.session_state.current_page == "Message Templates":
message_templates()
elif st.session_state.current_page == "View Sent Messages":
view_sent_messages()
elif st.session_state.current_page == "Projects & Campaigns":
projects_and_campaigns()
elif st.session_state.current_page == "Knowledge Base":
knowledge_base_view()
elif st.session_state.current_page == "AutoclientAI":
autoclient_ai_view()
elif st.session_state.current_page == "Automation Control":
automation_view()
def display_result_card(result, index):
with st.container():
st.markdown(f"""
<div style="border:1px solid #e0e0e0; border-radius:10px; padding:15px; margin-bottom:10px;">
<h3 style="color:#1E90FF;">Result {index + 1}</h3>
<p><strong>Email:</strong> {result[0]}</p>
<p><strong>Source:</strong> <a href="{result[1]}" target="_blank">{result[1][:50]}...</a></p>
<p><strong>Tags:</strong> {', '.join(result[4])}</p>
</div>
""", unsafe_allow_html=True)
def manual_search_page():
st.header("Manual Search")
# Fetch the last 5 search terms to use as a reference or for input suggestions.
last_5_terms = get_last_5_search_terms()
# Separate UI for single and multiple term searches using tabs
tab1, tab2 = st.tabs(["Single Term Search", "Multiple Terms Search"])
# ---- Single Term Search Tab ----
with tab1:
with st.form(key="single_search_form"):
search_term = st.text_input("Search Term")
num_results = st.slider("Number of Results", min_value=10, max_value=200, value=30, step=10)
campaign_id = st.selectbox("Select Campaign", options=fetch_campaigns())
search_type = st.selectbox("Search Type", options=["All Leads", "Exclude Probable Blogs/Directories"])
submit_button = st.form_submit_button(label="Search")
if submit_button:
st.session_state.single_search_started = True
st.session_state.single_search_results = []
st.session_state.single_search_logs = []
st.session_state.single_search_progress = 0
if 'single_search_started' in st.session_state and st.session_state.single_search_started:
# Containers to display search progress, logs, and results.
results_container = st.empty()
progress_bar = st.progress(st.session_state.single_search_progress)
status_text = st.empty()
log_container = st.empty()
if len(st.session_state.single_search_results) < num_results:
with st.spinner("Searching..."):
new_results = manual_search_wrapper(search_term, num_results - len(st.session_state.single_search_results), campaign_id.split(":")[0], search_type)
st.session_state.single_search_results.extend(new_results)
# Log and display progress of each found result.
for result in new_results:
log = f"Found result: {result[0]} from {result[1]}"
st.session_state.single_search_logs.append(log)
log_container.text_area("Search Logs", "\n".join(st.session_state.single_search_logs), height=200)
st.session_state.single_search_progress = len(st.session_state.single_search_results) / num_results
progress_bar.progress(st.session_state.single_search_progress)
status_text.text(f"Found {len(st.session_state.single_search_results)} results...")
# Display the results dynamically.
with results_container.container():
for j, res in enumerate(st.session_state.single_search_results):
display_result_card(res, j)
time.sleep(0.1) # Small delay for better animation effect.
# Search completion status
if len(st.session_state.single_search_results) >= num_results:
st.success(f"Search completed! Found {len(st.session_state.single_search_results)} results.")
st.session_state.single_search_started = False
# Display session-specific statistics
st.subheader("Search Statistics")
st.metric("Total Results Found", len(st.session_state.single_search_results))
st.metric("Unique Domains", len(set(result[0].split('@')[1] for result in st.session_state.single_search_results)))
# ---- Multiple Terms Search Tab ----
with tab2:
st.subheader("Enter Search Terms")
# Allow user to enter multiple search terms (one per line).
search_terms_text = st.text_area("Enter one search term per line", height=150, value="\n".join(last_5_terms))
load_button = st.button("Load Terms from Text Area")
if load_button:
terms_list = [term.strip() for term in search_terms_text.split('\n') if term.strip()]
st.session_state.loaded_terms = terms_list
st.rerun() # Refresh with loaded terms
if 'loaded_terms' not in st.session_state:
st.session_state.loaded_terms = [""] * 4 # Default to 4 empty terms
num_terms = len(st.session_state.loaded_terms)
# Form for multiple search terms
with st.form(key="multi_search_form"):
search_terms = [st.text_input(f"Search Term {i+1}", value=term, key=f"term_{i}")
for i, term in enumerate(st.session_state.loaded_terms)]
num_results_multiple = st.slider("Number of Results per Term", min_value=10, max_value=200, value=30, step=10)
campaign_id_multiple = st.selectbox("Select Campaign for Multiple Search", options=fetch_campaigns(), key="multi_campaign")
search_type = st.selectbox("Search Type", options=["All Leads", "Exclude Probable Blogs/Directories"])
col1, col2 = st.columns(2)
with col1:
submit_button = st.form_submit_button(label="Search All Terms")
with col2:
fill_button = st.form_submit_button(label="Fill with Least Searched Terms")
if submit_button:
st.session_state.multi_search_started = True
st.session_state.multi_search_results = []
st.session_state.multi_search_logs = []
st.session_state.multi_search_progress = 0
st.session_state.multi_search_terms = [term for term in search_terms if term.strip()]
if 'multi_search_started' in st.session_state and st.session_state.multi_search_started:
# Containers to display multi-search progress, logs, and results.
results_container = st.empty()
progress_bar = st.progress(st.session_state.multi_search_progress)
status_text = st.empty()
log_container = st.empty()
with st.spinner("Searching..."):
all_results = []
logs = []
total_terms = len(st.session_state.multi_search_terms)
# Iterate through multiple search terms and gather results
for term_index, term in enumerate(st.session_state.multi_search_terms):
status_text.text(f"Searching term {term_index + 1} of {total_terms}: {term}")
term_results = manual_search_wrapper(term, num_results_multiple, campaign_id_multiple.split(":")[0], search_type)
for i, result in enumerate(term_results):
all_results.append(result)
progress = (term_index * num_results_multiple + i + 1) / (total_terms * num_results_multiple)
progress_bar.progress(progress)
log = f"Term {term_index + 1}: Found result {i + 1}: {result[0]} from {result[1]}"
logs.append(log)
log_container.text_area("Search Logs", "\n".join(logs), height=200)
# Display each found result dynamically
with results_container.container():
for j, res in enumerate(all_results):
display_result_card(res, j)
time.sleep(0.1) # Small delay for animation effect
# Update progress bar after search completion
st.session_state.multi_search_progress = 1.0
progress_bar.progress(st.session_state.multi_search_progress)
st.success(f"Found {len(all_results)} results across all terms!")
# Display statistics specific to this session
st.subheader("Search Statistics")
st.metric("Total Results Found", len(all_results))
st.metric("Unique Domains", len(set(result[0].split('@')[1] for result in all_results)))
st.metric("Search Terms Processed", len(st.session_state.multi_search_terms))
if fill_button:
least_searched = get_least_searched_terms(num_terms)
st.session_state.loaded_terms = least_searched
st.rerun()
def bulk_search_page():
st.header("Bulk Search")
with st.form(key="bulk_search_form"):
num_results = st.slider("Results per term", min_value=10, max_value=200, value=30, step=10)
submit_button = st.form_submit_button(label="Start Bulk Search")
if submit_button:
st.session_state.bulk_search_started = True
st.session_state.bulk_search_results = []
st.session_state.bulk_search_logs = []
st.session_state.bulk_search_progress = 0
st.session_state.bulk_search_terms = fetch_search_terms()
if 'bulk_search_started' in st.session_state and st.session_state.bulk_search_started:
progress_bar = st.progress(st.session_state.bulk_search_progress)
status_text = st.empty()
log_container = st.empty()
results_container = st.empty()
with st.spinner("Performing bulk search..."):
total_terms = len(st.session_state.bulk_search_terms)
for term_index, term_row in enumerate(st.session_state.bulk_search_terms.iterrows()):
if term_index < len(st.session_state.bulk_search_results) // num_results:
continue
term = term_row.Term
status_text.text(f"Searching term {term_index + 1} of {total_terms}: {term}")
term_results = manual_search_wrapper(term, num_results, term_row.ID, "All Leads")
st.session_state.bulk_search_results.extend(term_results)
st.session_state.bulk_search_logs.extend([f"Term {term_index + 1}: Found result {i + 1}: {result[0]} from {result[1]}" for i, result in enumerate(term_results)])
st.session_state.bulk_search_progress = (term_index + 1) / total_terms
progress_bar.progress(st.session_state.bulk_search_progress)
log_container.text_area("Search Logs", "\n".join(st.session_state.bulk_search_logs), height=200)
with results_container.container():
for j, res in enumerate(st.session_state.bulk_search_results):
display_result_card(res, j)
if st.session_state.bulk_search_progress >= 1:
st.success(f"Bulk search completed! Found {len(st.session_state.bulk_search_results)} results.")
st.session_state.bulk_search_started = False
# Display statistics
st.subheader("Bulk Search Statistics")
st.metric("Total Results Found", len(st.session_state.bulk_search_results))
st.metric("Unique Domains", len(set(result[0].split('@')[1] for result in st.session_state.bulk_search_results)))
st.metric("Search Terms Processed", total_terms)
def bulk_send_page():
st.header("Bulk Send")
templates = fetch_message_templates()
if not templates:
st.warning("No message templates found. Please create a template first.")
return
with st.form(key="bulk_send_form"):
template_id = st.selectbox("Select Message Template", options=templates)
from_email = st.text_input("From Email", value="Sami Halawa <[email protected]>")
reply_to = st.text_input("Reply To", value="[email protected]")
send_option = st.radio("Send to:",
["All Leads",
"All Not Contacted with this Template",
"All Not Contacted with Templates from this Campaign"])
filter_option = st.radio("Filter:",
["Not Filter Out Leads",
"Filter Out blog-directory"])
col1, col2 = st.columns(2)
with col1:
preview_button = st.form_submit_button(label="Preview Email")
with col2:
send_button = st.form_submit_button(label="Start Bulk Send")
if preview_button:
preview = get_email_preview(template_id.split(":")[0], from_email, reply_to)
st.components.v1.html(preview, height=600, scrolling=True)
if send_button:
st.session_state.bulk_send_started = True
st.session_state.bulk_send_logs = []
st.session_state.bulk_send_progress = 0
# Fetch leads based on send_option and filter_option
leads_to_send = fetch_leads_for_bulk_send(template_id.split(":")[0], send_option, filter_option)
st.write(f"Preparing to send emails to {len(leads_to_send)} leads")
# Perform bulk send
bulk_send_coroutine = bulk_send(template_id.split(":")[0], from_email, reply_to, leads_to_send)
logs = asyncio.run(bulk_send_coroutine)
# Display logs and statistics
for log in logs:
st.write(log)
st.success(f"Bulk send completed. Sent {len(leads_to_send)} emails.")
def fetch_leads_for_bulk_send(template_id, send_option, filter_option):
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT l.id, l.email
FROM leads l
JOIN lead_sources ls ON l.id = ls.lead_id
"""
if send_option == "All Not Contacted with this Template":
query += f"""
LEFT JOIN messages m ON l.id = m.lead_id AND m.template_id = {template_id}
WHERE m.id IS NULL
"""
elif send_option == "All Not Contacted with Templates from this Campaign":
query += f"""
LEFT JOIN messages m ON l.id = m.lead_id
LEFT JOIN message_templates mt ON m.template_id = mt.id
WHERE m.id IS NULL OR mt.campaign_id != (SELECT campaign_id FROM message_templates WHERE id = {template_id})
"""
if filter_option == "Filter Out blog-directory":
query += " AND NOT ls.tags LIKE '%blog-directory%'"
cursor.execute(query)
leads = cursor.fetchall()
conn.close()
return leads
async def bulk_send(template_id, from_email, reply_to, leads):
template_id = validate_id(template_id, "template")
from_email = validate_email(from_email)
reply_to = validate_email(reply_to)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT name, subject, body_content FROM message_templates WHERE id = ?', (template_id,))
template = cursor.fetchone()
if not template:
return "Template not found"
template_name, subject, body_content = template
total_leads = len(leads)
logs = [
f"Preparing to send emails to {total_leads} leads",
f"Template Name: {template_name}",
f"Subject: {subject}",
f"From Email: {from_email}",
f"Reply To: {reply_to}"
]
for i, (lead_id, email) in enumerate(leads):
try:
logs.append(f"Sent email to {email} (Lead ID: {lead_id})")
except Exception as e:
save_message(lead_id, template_id, 'failed', datetime.now(), subject, None, str(e))
logs.append(f"Failed to send email to {email} (Lead ID: {lead_id}): {e}")
progress = (i + 1) / total_leads
st.session_state.bulk_send_progress = progress
st.session_state.bulk_send_logs = logs
time.sleep(0.1) # Add a small delay for UI updates
conn.close()
return logs
def view_leads():
st.header("View Leads")
if st.button("Refresh Leads"):
st.session_state.leads = fetch_leads()
if 'leads' not in st.session_state:
st.session_state.leads = fetch_leads()
for _, lead in st.session_state.leads.iterrows():
with st.expander(f"Lead: {lead['Email']}"):
col1, col2 = st.columns(2)
with col1:
st.write(f"**ID:** {lead['ID']}")
st.write(f"**Email:** {lead['Email']}")
st.write(f"**Phone:** {lead['Phone']}")
st.write(f"**Name:** {lead['First Name']} {lead['Last Name']}")
st.write(f"**Company:** {lead['Company']}")
st.write(f"**Job Title:** {lead['Job Title']}")
with col2:
st.write(f"**Search Term:** {lead['Search Term']}")
st.write(f"**Source URL:** {lead['Source URL']}")
st.write(f"**Page Title:** {lead['Page Title']}")
st.write(f"**Meta Description:** {lead['Meta Description']}")
st.write(f"**Phone Numbers:** {lead['Phone Numbers']}")
# Handle different possible formats of the Tags column
if isinstance(lead['Tags'], str):
try:
tags = json.loads(lead['Tags'])
st.write(f"**Tags:** {', '.join(tags)}")
except json.JSONDecodeError:
st.write(f"**Tags:** {lead['Tags']}")
elif isinstance(lead['Tags'], list):
st.write(f"**Tags:** {', '.join(lead['Tags'])}")
else:
st.write(f"**Tags:** {lead['Tags']}")
st.write("**Page Content:**")
if lead['Content'] is not None:
st.text(lead['Content'][:500] + "..." if len(lead['Content']) > 500 else lead['Content'])
else:
st.text("No content available")
# Optional: Add a download button for the full dataset
csv = st.session_state.leads.to_csv(index=False).encode('utf-8')
st.download_button(
label="Download full dataset as CSV",
data=csv,
file_name="leads_data.csv",
mime="text/csv",
)
st.download_button(
label="Download full dataset as Excel",
data=st.session_state.leads.to_excel(index=False).to_bytes(),
file_name="leads_data.xlsx",
mime="application/vnd.ms-excel",
)
def search_terms():
st.header("Search Terms")
# Add a selector for different views
view_option = st.radio("Select View", ["Basic List", "Search Term Groups"])
if view_option == "Basic List":
display_basic_search_terms_list()
else:
display_search_term_groups()
def display_basic_search_terms_list():
col1, col2 = st.columns(2)
with col1:
st.subheader("Add Search Term")
with st.form(key="add_search_term_form"):
search_term = st.text_input("Search Term")
campaign_id = st.selectbox("Select Campaign", options=fetch_campaigns())
submit_button = st.form_submit_button(label="Add Search Term")
if submit_button:
term_id = add_search_term(search_term, campaign_id.split(":")[0])
st.success(f"Search term added with ID: {term_id}")
st.session_state.search_terms = fetch_search_terms()
with col2:
st.subheader("Existing Search Terms")
if st.button("Refresh Search Terms"):
st.session_state.search_terms = fetch_search_terms()
if 'search_terms' not in st.session_state:
st.session_state.search_terms = fetch_search_terms()
# Sort the dataframe by 'Leads Fetched' in descending order
sorted_terms = st.session_state.search_terms.sort_values('Leads Fetched', ascending=False)
for index, row in sorted_terms.iterrows():
col1, col2 = st.columns([3, 1])
with col1:
st.write(f"{row['ID']}: {row['Search Term']} (Leads: {row['Leads Fetched']})")
with col2:
if st.button("Delete", key=f"delete_{row['ID']}"):
st.session_state.confirm_delete = row['ID']
st.session_state.leads_to_delete = delete_search_term_and_leads(row['ID'])
if 'confirm_delete' in st.session_state:
st.warning(f"Are you sure you want to delete search term {st.session_state.confirm_delete} and its related leads?")
st.write("Leads to be deleted:")
for lead in st.session_state.leads_to_delete:
st.write(f"- {lead[1]}")
col1, col2 = st.columns(2)
with col1:
if st.button("Confirm Delete"):
delete_search_term_and_leads(st.session_state.confirm_delete)
del st.session_state.confirm_delete
del st.session_state.leads_to_delete
st.session_state.search_terms = fetch_search_terms()
st.success("Search term and related leads deleted successfully.")
with col2:
if st.button("Cancel"):
del st.session_state.confirm_delete
del st.session_state.leads_to_delete
def display_search_term_groups():
st.subheader("Search Term Groups")
# Fetch search term groups data
groups_data = fetch_search_term_groups_data()
# Display summary statistics
total_groups = len(groups_data)
total_terms = sum(group['term_count'] for group in groups_data)
total_leads = sum(group['total_leads'] for group in groups_data)
col1, col2, col3 = st.columns(3)
col1.metric("Total Groups", total_groups)
col2.metric("Total Search Terms", total_terms)
col3.metric("Total Leads", total_leads)
# Create a bar chart for groups
fig = px.bar(
groups_data,
x='group_name',
y='total_leads',
title='Leads per Search Term Group',
labels={'group_name': 'Group Name', 'total_leads': 'Total Leads'},
hover_data=['term_count']
)
st.plotly_chart(fig)
# Display detailed group information
for group in groups_data:
with st.expander(f"{group['group_name']} (Terms: {group['term_count']}, Leads: {group['total_leads']})"):
st.write(f"Description: {group['description']}")
st.write("Search Terms:")
terms_df = pd.DataFrame(group['terms'])
st.dataframe(terms_df.sort_values('leads_fetched', ascending=False))
def fetch_search_term_groups_data():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
stg.id,
stg.name AS group_name,
stg.description,
COUNT(st.id) AS term_count,
SUM(st.processed_leads) AS total_leads
FROM
search_term_groups stg
LEFT JOIN
search_terms st ON stg.id = st.group_id
GROUP BY
stg.id
""")
groups = cursor.fetchall()
groups_data = []
for group in groups:
cursor.execute("""
SELECT
id,
term,
processed_leads AS leads_fetched
FROM
search_terms
WHERE
group_id = ?
""", (group['id'],))
terms = cursor.fetchall()
groups_data.append({
'id': group['id'],
'group_name': group['group_name'],
'description': group['description'],
'term_count': group['term_count'],
'total_leads': group['total_leads'],
'terms': terms
})
conn.close()
return groups_data
def delete_search_term_and_leads(search_term_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
# Get the leads associated with this search term
cursor.execute("""
SELECT DISTINCT l.id, l.email
FROM leads l
JOIN lead_sources ls ON l.id = ls.lead_id
WHERE ls.search_term_id = ?
""", (search_term_id,))
leads_to_delete = cursor.fetchall()
# Delete the lead sources
cursor.execute("DELETE FROM lead_sources WHERE search_term_id = ?", (search_term_id,))
# Delete the leads
lead_ids = [lead[0] for lead in leads_to_delete]
cursor.executemany("DELETE FROM leads WHERE id = ?", [(id,) for id in lead_ids])
# Delete the search term
cursor.execute("DELETE FROM search_terms WHERE id = ?", (search_term_id,))
conn.commit()
return leads_to_delete
except sqlite3.Error as e:
conn.rollback()
logging.error(f"Error deleting search term and leads: {e}")
raise
finally:
conn.close()
def message_templates():
st.header("Message Templates")
col1, col2 = st.columns(2)
with col1:
st.subheader("Add Message Template")
with st.form(key="add_message_template_form"):
template_name = st.text_input("Template Name")
subject = st.text_input("Subject")
body_content = st.text_area("Body Content (HTML)", height=400)
campaign_id = st.selectbox("Select Campaign", options=fetch_campaigns())
submit_button = st.form_submit_button(label="Add Message Template")
if submit_button:
template_id = create_message_template(template_name, subject, body_content, campaign_id.split(":")[0])
st.success(f"Message template added with ID: {template_id}")
with col2:
st.subheader("Existing Message Templates")
if st.button("Refresh Message Templates"):
st.session_state.message_templates = fetch_message_templates()
if 'message_templates' not in st.session_state:
st.session_state.message_templates = fetch_message_templates()
st.dataframe(pd.DataFrame(st.session_state.message_templates, columns=["Template"]), use_container_width=True)
def view_sent_messages():
st.header("View Sent Messages")
if st.button("Refresh Sent Messages"):
st.session_state.sent_messages = fetch_sent_messages()
if 'sent_messages' not in st.session_state:
st.session_state.sent_messages = fetch_sent_messages()
# Display messages in a more organized manner
for _, row in st.session_state.sent_messages.iterrows():
with st.expander(f"Message to {row['Email']} - {row['Sent At']}"):
st.write(f"**Subject:** {row['Subject']}")
st.write(f"**Template:** {row['Template']}")
st.write(f"**Status:** {row['Status']}")
st.write(f"**Message ID:** {row['Message ID']}")
st.write("**Content:**")
st.markdown(row['Content'], unsafe_allow_html=True)
# Display summary statistics
st.subheader("Summary Statistics")
total_messages = len(st.session_state.sent_messages)
sent_messages = len(st.session_state.sent_messages[st.session_state.sent_messages['Status'] == 'sent'])
failed_messages = len(st.session_state.sent_messages[st.session_state.sent_messages['Status'] == 'failed'])
col1, col2, col3 = st.columns(3)
col1.metric("Total Messages", total_messages)
col2.metric("Sent Messages", sent_messages)
col3.metric("Failed Messages", failed_messages)
def projects_and_campaigns():
st.header("Projects & Campaigns")
col1, col2 = st.columns(2)
with col1:
st.subheader("Add Project")
with st.form(key="add_project_form"):
project_name = st.text_input("Project Name")
submit_button = st.form_submit_button(label="Add Project")
if submit_button:
project_id = create_project(project_name)
st.success(f"Project added with ID: {project_id}")
with col2:
st.subheader("Add Campaign")
with st.form(key="add_campaign_form"):
campaign_name = st.text_input("Campaign Name")
project_id = st.selectbox("Select Project", options=fetch_projects())
campaign_type = st.selectbox("Campaign Type", options=["Email", "SMS"])
submit_button = st.form_submit_button(label="Add Campaign")
if submit_button:
campaign_id = create_campaign(campaign_name, project_id.split(":")[0], campaign_type)
st.success(f"Campaign added with ID: {campaign_id}")
def get_last_5_search_terms():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT term FROM search_terms
ORDER BY id DESC
LIMIT 5
""")
terms = [row[0] for row in cursor.fetchall()]
conn.close()
return terms
def fetch_leads():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT l.id, l.email, l.phone, l.first_name, l.last_name, l.company, l.job_title,
st.term as search_term, ls.url as source_url, ls.page_title, ls.meta_description,
ls.phone_numbers, ls.content, ls.tags
FROM leads l
LEFT JOIN lead_sources ls ON l.id = ls.lead_id
LEFT JOIN search_terms st ON ls.search_term_id = st.id
ORDER BY l.id DESC
''')
rows = cursor.fetchall()
conn.close()
return pd.DataFrame(rows, columns=["ID", "Email", "Phone", "First Name", "Last Name", "Company", "Job Title",
"Search Term", "Source URL", "Page Title", "Meta Description",
"Phone Numbers", "Content", "Tags"])
import asyncio
import logging
from datetime import datetime
import time
import time
def automation_view():
st.header("Automation Control Panel")
# Generate a unique key based on the current timestamp
unique_key = f"automation_{int(time.time() * 1000)}"
# Initialize automation_status if it doesn't exist
if 'automation_status' not in st.session_state:
st.session_state.automation_status = False
col1, col2 = st.columns(2)
with col1:
# Start/Stop Automation button
button_text = "Stop Automation" if st.session_state.automation_status else "Start Automation"
button_color = "secondary" if st.session_state.automation_status else "primary"
if st.button(button_text, key=f"{unique_key}_toggle", type=button_color):
st.session_state.automation_status = not st.session_state.automation_status
with col2:
# Quick Scan button
if st.button("Quick Scan", key=f"{unique_key}_scan", type="primary"):
with st.spinner("Performing quick scan..."):
# Implement quick scan logic here
st.success("Quick scan completed!")
# Display current automation status
if st.session_state.automation_status:
st.success("Automation is currently **ON**.")
else:
st.warning("Automation is currently **OFF**.")
# Real-time analytics for automation process
display_real_time_analytics()
# Additional control buttons
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Optimize Search Terms", key=f"{unique_key}_optimize", type="secondary"):
with st.spinner("Optimizing search terms..."):
asyncio.run(continuous_search_term_optimization())
st.success("Search terms optimized.")
with col2:
if st.button("Send Test Email", key=f"{unique_key}_test_email", type="secondary"):
with st.spinner("Sending test email..."):
# Implement test email sending logic here
st.success("Test email sent successfully.")
with col3:
if st.button("Generate Report", key=f"{unique_key}_report", type="secondary"):
with st.spinner("Generating report..."):
# Implement report generation logic here
st.success("Report generated successfully.")
# Run the automation process if it's ON
if st.session_state.automation_status:
if st.button("Run Full Automation Cycle", key=f"{unique_key}_run", type="primary"):
with st.spinner("Running full automation cycle..."):
asyncio.run(continuous_automation_process())
st.success("Full automation cycle completed.")
# ... (rest of the code remains unchanged)
# Separate function for the continuous process
async def continuous_automation_process():
try:
# Step 1: Gather leads
st.write("Gathering leads...")
await continuous_lead_collection()
# Step 2: Optimize search terms
st.write("Optimizing search terms...")
await continuous_search_term_optimization()
# Step 3: Send emails
st.write("Sending emails...")
await continuous_email_sending()
except Exception as e:
st.error(f"Error during automation process: {e}")
logging.error(f"Error during automation process: {e}")
# Make sure to add this function if it doesn't exist:
def display_real_time_analytics():
st.subheader("Real-Time Analytics")
total_leads = count_total_leads()
leads_last_24_hours = count_leads_last_24_hours()
emails_sent = count_emails_sent()
optimized_terms = count_optimized_search_terms()
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total Leads", total_leads)
col2.metric("Leads in Last 24 Hours", leads_last_24_hours)
col3.metric("Emails Sent", emails_sent)
col4.metric("Optimized Terms", optimized_terms)
# Count total leads gathered
def count_total_leads():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM leads")
result = cursor.fetchone()[0]
conn.close()
return result
# Count leads gathered in the last 24 hours
def count_leads_last_24_hours():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM leads WHERE created_at >= datetime('now', '-1 day')")
result = cursor.fetchone()[0]
conn.close()
return result
# Count total emails sent
def count_emails_sent():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM messages WHERE status = 'sent'")
result = cursor.fetchone()[0]
conn.close()
return result
# Count the number of optimized search terms
def count_optimized_search_terms():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM optimized_search_terms")
result = cursor.fetchone()[0]
conn.close()
return result
# Continuously run lead gathering, email sending, and search term optimization
async def continuous_automation_process():
while st.session_state.automation_status:
try:
# Step 1: Gather leads
st.write(f"{datetime.now()}: Gathering leads...")
await continuous_lead_collection()
# Step 2: Optimize search terms
st.write(f"{datetime.now()}: Optimizing search terms...")
await continuous_search_term_optimization()
# Step 3: Send emails
st.write(f"{datetime.now()}: Sending emails...")
await continuous_email_sending()
# Wait between iterations
await asyncio.sleep(60) # Adjust the sleep interval as necessary (e.g., 60 seconds)
except Exception as e:
logging.error(f"Error during automation process: {e}")
break
# Step 1: Lead gathering automation
async def continuous_lead_collection(num_results_per_term=10, sleep_interval=60):
"""Automate lead collection process."""
search_terms = get_least_searched_terms(5) # Get least searched terms for optimization
for term in search_terms:
campaign_id = 1 # Default campaign
results = await asyncio.to_thread(manual_search_wrapper, term, num_results_per_term, campaign_id)
for result in results:
email = result[0]
if is_valid_email(email):
lead_id = save_lead(email, None, None, None, None, None)
add_lead_to_campaign(campaign_id, lead_id)
# Wait for the next iteration
await asyncio.sleep(sleep_interval)
# Step 2: Search term optimization automation
async def continuous_search_term_optimization():
"""Automate search term optimization."""
current_terms = fetch_all_search_terms() # Fetch all current terms
kb_info = get_knowledge_base_info() # Get context from knowledge base
# Optimize search terms with AI
optimized_terms = optimize_search_terms(current_terms, kb_info)
# Save optimized terms
save_optimized_search_terms(optimized_terms)
log_ai_request("Optimize Search Terms", current_terms, optimized_terms)
# Step 3: Email sending automation
async def continuous_email_sending(sleep_interval=60):
"""Automate bulk email sending."""
campaign_id = 1 # Example campaign
leads = fetch_leads_for_bulk_send(template_id="1", send_option="All Not Contacted", filter_option="Filter Out blog-directory")
if leads:
logs = await bulk_send("1", "[email protected]", "[email protected]", leads)
for log in logs:
logging.info(log)
# Wait for the next iteration
await asyncio.sleep(sleep_interval)
# AI-powered search term optimization
def optimize_search_terms(current_terms, kb_info):
prompt = f"""
Optimize the following search terms for high-quality lead generation:
Search Terms: {', '.join(current_terms)}
Knowledge Base Info: {kb_info}
Focus on terms that will attract leads with high conversion potential and align with the product.
"""
response = client.chat.completions.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}]
)
log_ai_request("Optimize Search Terms", prompt, response.choices[0].message.content)
return response.choices[0].message.content.split('\n')
# Save AI request logs
def log_ai_request(request_type, prompt, response):
log_entry = f"{datetime.now()}: {request_type}\nPrompt: {prompt}\nResponse: {response}"
ai_request_logs.append(log_entry)
logging.info(log_entry)
# Save optimized search terms
def save_optimized_search_terms(optimized_terms):
conn = get_db_connection()
cursor = conn.cursor()
for term in optimized_terms:
cursor.execute("INSERT INTO optimized_search_terms (term) VALUES (?)", (term,))
conn.commit()
conn.close()
async def supervisor_check():
ineffective_terms = get_ineffective_search_terms(threshold=0.3) # Terms with <30% lead success
for term in ineffective_terms:
optimized_term = refine_search_term(term)
update_search_term(term, optimized_term)
def get_ineffective_search_terms(threshold):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT term FROM search_term_effectiveness
WHERE (valid_leads / total_results) < ?
""", (threshold,))
terms = cursor.fetchall()
conn.close()
return [term[0] for term in terms]
def chatbot_style_logs(logs):
st.subheader("Automation Logs")
for log in logs:
st.markdown(f"**[{log['timestamp']}]** {log['message']}")
def log_ai_request(request_type, prompt, response):
log_entry = {"timestamp": datetime.now(), "message": f"{request_type}: {response}"}
ai_request_logs.append(log_entry)
logging.info(f"{request_type}: {response}")
chatbot_style_logs(ai_request_logs)
def automation_control_panel():
st.header("Automation Control")
# Toggle automation ON/OFF
if 'automation_status' not in st.session_state:
st.session_state.automation_status = False
if st.button("Toggle Automation"):
st.session_state.automation_status = not st.session_state.automation_status
if st.session_state.automation_status:
st.write("**Automation is currently ON.**")
asyncio.run(continuous_automation_process())
else:
st.write("**Automation is currently OFF.**")
# Real-time analytics
display_real_time_analytics()
def display_real_time_analytics():
total_leads = count_total_leads()
leads_last_24h = count_leads_last_24_hours()
emails_sent = count_emails_sent()
optimized_terms = count_optimized_search_terms()
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total Leads", total_leads)
col2.metric("Leads in Last 24 Hours", leads_last_24h)
col3.metric("Emails Sent", emails_sent)
col4.metric("Optimized Terms", optimized_terms)
async def continuous_automation_process():
cycle_count = 0
while st.session_state.automation_status:
try:
# Lead collection step
await continuous_lead_collection()
# Search term optimization every 5 cycles
if cycle_count % 5 == 0:
supervisor_check()
# Email sending step
await continuous_email_sending()
cycle_count += 1
await asyncio.sleep(60) # Run the cycle every 60 seconds
except Exception as e:
logging.error(f"Error in automation: {e}")
st.error(f"Error in automation process: {e}")
# Initialize real-time analytics in Streamlit
if __name__ == "__main__":
st.title("AUTOCLIENT - Full Automation System")
# Automation view
automation_view()
# Start the continuous automation process if enabled
if st.session_state.automation_status:
asyncio.run(continuous_automation_process())
# Call the main function to run the Streamlit app
if __name__ == "__main__":
main()