# Load libaries | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
import cleantext | |
import re | |
import ast | |
import streamlit as st | |
import spacy | |
from spacy.lang.en import English | |
from dotenv import load_dotenv | |
from subprocess import Popen | |
import scrapy | |
from scrapy import Selector | |
import json | |
import requests | |
md_intro = '''# Business News Sentiment Dashboard | |
The dashboard has 2 tabs: | |
- Sentiment prediction: receives a news link and outputs sentiment results | |
- News sentiment report: reports the sentiment of business news from past few days | |
Main libraries used: scrapy, SpaCy, PyTorch, transformers, streamlit | |
News scope: CNN, BBC, CNBC (other business news sources don't have free access) | |
Time scope: up to 3 days (from yesterday to 3 days ago), based on free tier's available data source | |
''' | |
md_sumstats = '''## News Sentiment Summary | |
''' | |
md_table = '''## News Sentiment Report | |
''' | |
md_notes = '''## Notes and Thoughts: | |
Lexicon-based approach may confuse named entities and actual sentiment, because brand names may have positive words, say BeautifulSoup. | |
Hence, implementing named entity recognition before sentiment analysis helps to improve accuracy. | |
Using RNN-based approach can also overcome lexicon issues, but it also takes more resources. | |
## References: | | | | | | | | | | | | | |
''' | |
dat_name = './news_db/merged_news_data_' + ( - timedelta(days=1)).strftime('%Y-%m-%d') + '.csv' | |
news = pd.read_csv(dat_name, on_bad_lines='skip') | |
news[['pos_sent', 'neg_sent']] = news[['pos_sent', 'neg_sent']].fillna('') | |
news['clean_content'] = news.clean_content.apply(lambda x: ast.literal_eval(x)) | |
news = news.fillna(value = '') | |
news['clean_date'] = pd.to_datetime(news['clean_date'], dayfirst=True) | |
# Calculate summary | |
def news_stats(news, method_selection, range_selection): | |
overall_sentiment = 0 | |
news_count = 0 | |
news = news[news.rnn_arti_score != ''] | |
news['chosen_score'] = np.where((method_selection == 'Lexicon') | (method_selection is None), | |
news['arti_score'], news['rnn_arti_score']) | |
if range_selection == '1 day' or range_selection is None: | |
overall_sentiment = news[news.date_extracted == ( - timedelta(days=1)).strftime('%Y-%m-%d')].chosen_score.mean() | |
news_count = news[news.date_extracted == ( - timedelta(days=1)).strftime('%Y-%m-%d')].title.count() | |
elif range_selection == '3 days': | |
overall_sentiment = news.chosen_score.mean() | |
news_count = news.title.count() | |
return overall_sentiment, news_count | |
def news_table(news, date_selection, method_selection): | |
if date_selection == 'Yesterday' or date_selection is None: | |
date_selected = ( - timedelta(days=1)).strftime('%Y-%m-%d') | |
elif date_selection == '2 Days Ago': | |
date_selected = ( - timedelta(days=2)).strftime('%Y-%m-%d') | |
elif date_selection == '3 Days Ago': | |
date_selected = ( - timedelta(days=3)).strftime('%Y-%m-%d') | |
if method_selection == 'Lexicon' or method_selection is None: | |
clean_news = news.loc[news.date_extracted == date_selected, ['title', 'arti_score', 'pos_sent', 'neg_sent', 'clean_date', 'url']] | |
clean_news = clean_news.rename(columns = {'title': 'Title', 'url': 'URL', 'clean_date': 'Date', | |
'arti_score': 'Sentiment Score', | |
'pos_sent': 'Most Positive Sentence', | |
'neg_sent': 'Least Positive Sentence'}) | |
elif method_selection == 'Transformer': | |
clean_news = news.loc[news.date_extracted == date_selected, ['title', 'rnn_arti_score', 'rnn_pos_sent', 'rnn_neg_sent', 'clean_date', 'url']] | |
clean_news = clean_news.rename(columns = {'title': 'Title', 'url': 'URL', 'clean_date': 'Date', | |
'rnn_arti_score': 'Sentiment Score', | |
'rnn_pos_sent': 'Most Positive Sentence', | |
'rnn_neg_sent': 'Least Positive Sentence'}) | |
# Formatting for table display | |
clean_news = clean_news.sort_values('Date', ascending = False).reset_index(drop=True) | |
clean_news.loc[:, 'Title'] = clean_news['Title'].str.wrap(width = 40) | |
clean_news.loc[:, 'URL'] = clean_news['URL'].str.wrap(width = 65) | |
clean_news.loc[:, 'Most Positive Sentence'] = clean_news['Most Positive Sentence'].str.wrap(width = 65) | |
clean_news.loc[:, 'Least Positive Sentence'] = clean_news['Least Positive Sentence'].str.wrap(width = 65) | |
return clean_news | |
def bbc_json_extract(bbc_script): | |
json_data = json.loads(bbc_script) | |
res = '' | |
news_key = list(json_data['props']['pageProps']['page'].keys())[0] | |
for item in json_data['props']['pageProps']['page'][news_key]['contents']: | |
if item['type'] == 'text': | |
for block in item['model']['blocks']: | |
if block['type'] == 'paragraph': | |
#res = res + ''.join(block['model']['text']).strip() | |
res = res + block['model']['text'] + ' ' | |
return res | |
def link_extract(link): | |
extracted_content = '' | |
if newslink is not None and newslink != '': | |
if 'https://' in newslink or 'http://' in newslink: | |
clean_link = newslink | |
else: | |
clean_link = 'https://' + newslink | |
html = requests.get(clean_link).content | |
sel = Selector(text = html) | |
if '' in clean_link: | |
raw_content = sel.xpath('//body//script//text()').extract()[0] | |
extracted_content = bbc_json_extract(raw_content) | |
elif '' in clean_link: | |
extracted_content = ''.join(sel.xpath('//p[contains(@class, "paragraph") and not(@class="footer__disclaimer-text") and not(@class="footer__copyright-text")]//text()').getall()).strip() | |
elif '' in clean_link: | |
extracted_content = ''.join(sel.xpath('//div[@class="ArticleBody-articleBody"]//p//text()').getall()).strip() | |
return extracted_content | |
def sentence_breakdown(string): | |
# Transform scraped data to a list of separate sentences | |
sentences = "" | |
if string != "": | |
clean_string = cleantext.clean(string, extra_spaces = True) | |
for ch in [r'\r', r'\n', r'\t', r'\xa0', '\r', '\n', '\t', '\xa0']: | |
if ch in clean_string: | |
clean_string = clean_string.replace(ch, '') | |
nlp = English() | |
nlp.add_pipe('sentencizer') | |
doc = nlp(clean_string) | |
sentences = [sent.text.strip() for sent in doc.sents] | |
return sentences | |
def rnn_sentence_sentiment(sent): | |
from transformers import TextClassificationPipeline, AutoTokenizer, AutoModelForSequenceClassification | |
tokenizer = AutoTokenizer.from_pretrained("Venkatesh4342/distilbert-helpdesk-sentence-sentiment") | |
model = AutoModelForSequenceClassification.from_pretrained("Venkatesh4342/distilbert-helpdesk-sentence-sentiment") | |
pipe = TextClassificationPipeline(model=model, tokenizer=tokenizer) | |
prediction = pipe(sent, top_k=1) | |
# assign score to sentiment, positive sentiment has positive score, negative sentiment has negative score, neutral has 0 | |
if prediction[0]['label'] == 'Positive': | |
res = prediction[0]['score'] | |
elif prediction[0]['label'] == 'Negative': | |
res = -prediction[0]['score'] | |
elif prediction[0]['label'] == 'Neutral': | |
res = 0 | |
return res | |
def article_sentiment(arti): | |
# Input is a list of strings/sentences | |
scores = dict() | |
for sent in arti: | |
scores[sent] = rnn_sentence_sentiment(sent) | |
scores_list = list(scores.values()) | |
arti_score = np.mean([score for score in scores_list if score != 0]) | |
pos_sents = sorted(scores, key=scores.get, reverse=True)[:3] | |
neg_sents = sorted(scores, key=scores.get, reverse=False)[:3] | |
return round(arti_score, 3), pos_sents, neg_sents | |
st.markdown(md_intro) | |
tab_pred, tab_news = st.tabs(["Sentiment Prediction", "News Report"]) | |
with tab_pred: | |
st.write("This is a sentiment prediction module.\nPlease enter your news link into the textbox.\nSentiment prediction will be returned shortly!.\nExample link:") | |
newslink = st.chat_input(placeholder="Please input CNN/BBC/CNBC link") | |
if newslink: | |
placeholder = st.empty() | |
placeholder.text("Running ...") | |
extracted_content = link_extract(newslink) | |
cleaned_content = sentence_breakdown(extracted_content) | |
arti_score, user_pos_sents, user_neg_sents = article_sentiment(cleaned_content) | |
placeholder.empty() | |
if newslink: | |
st.markdown(f'### Article sentiment score is: {arti_score}') | |
st.markdown("### Three most positive sentences are: ") | |
st.markdown(f"{user_pos_sents[0]}") | |
st.markdown(''' | |
''') | |
st.markdown(f'''{user_pos_sents[1]} | |
''') | |
st.markdown(f'''{user_pos_sents[2]} | |
''') | |
st.markdown("### Three most negative sentences are: ") | |
st.markdown(f'''{user_neg_sents[0]} | |
''') | |
st.markdown(f'''{user_neg_sents[1]} | |
''') | |
st.markdown(f"{user_neg_sents[2]}") | |
with tab_news: | |
st.markdown(md_sumstats) | |
method_col, range_col = st.columns(2) | |
with method_col: | |
method_selection = st.selectbox("Select Method", ('Lexicon', 'Transformer')) | |
with range_col: | |
range_selection = st.selectbox("Statistics Range", ('1 day', '3 days')) | |
overall_sentiment, news_count = news_stats(news, method_selection, range_selection) | |
senti_col, count_col = st.columns(2) | |
senti_col.metric("Overall Sentiment", str(round(overall_sentiment, 3))) | |
count_col.metric("Number of News", str(news_count)) | |
st.markdown(md_table) | |
date_selection = st.selectbox("Extraction Date", ('Yesterday', '2 Days Ago', '3 Days Ago')) | |
clean_news = news_table(news, date_selection, method_selection) | |
st.dataframe(data=clean_news, | |
column_config={"Title": st.column_config.Column(width=250), | |
"Most Positive Sentence": st.column_config.Column(width=400), | |
"Least Positive Sentence": st.column_config.Column(width=400), | |
"Date": st.column_config.DateColumn(format="DD-MM-YYYY"), | |
"URL": st.column_config.LinkColumn("App URL", width=400) | |
}) | |
st.markdown(md_notes) |