aibb / scrap_util.py
zdxpan's picture
aibb 内容生成工具
7f62904
raw
history blame
No virus
7.98 kB
from faker import Faker
import pandas as pd
import re
import sys
import json
import unicodedata
from bs4 import BeautifulSoup #解析requests请求到的HTML页面
from urllib.parse import urljoin
import cpca # 地点 city mapping
from selenium import webdriver
import helium as hm
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from keywordInfo import key_pat, zwlx_list
# 北京市信息发布,考编公告~ A _1
sub_url ="http://www.beijing.gov.cn/gongkai/rsxx/gwyzk/202211/t20221120_2862819.html"
sub_url ="https://www.js.msa.gov.cn/art/2023/2/24/art_11436_1391666.html"
def getDriver():
# Set path Selenium
uas = Faker()
CHROMEDRIVER_PATH = '/usr/bin/chromedriver'
CHROMEDRIVER_PATH = './chromedriver'
s = Service(CHROMEDRIVER_PATH)
WINDOW_SIZE = "1920,1080"
# Options
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--window-size=%s" % WINDOW_SIZE)
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-infobars')
chrome_options.add_argument('--disable-gpu')
# 增加一个参数设置
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument(f'user-agent={uas.chrome()}')
with open('./stealth.min.js') as f:
js = f.read()
driver = webdriver.Chrome(executable_path=CHROMEDRIVER_PATH, options=chrome_options)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js })
return driver
def content_with_date(lines):
if len(lines) < 1:
return []
date_pattern1 = r'\d{1,2}月\d{1,2}日'
date_pattern2 = r'[上|下]午'
date_pattern3 = r'\d{4}年\d{1,2}月\d{1,2}日'
inx_ = len(lines)-1
for inx_, line in enumerate(lines):
matches = re.findall(f'({date_pattern1}|{date_pattern3})', line)
if len(matches)>0:
break
if len(matches)<1:
return []
# new_ = "\n".join(lines[:inx_+1])
# year_pattern = r'\d{4}年'
# matches = re.findall(f'({year_pattern})', old_title)
return lines[:inx_+1]
#👏👏👏 这里需要调整获取多少段落,不至于获得太多冗余信息
def find_key_paragrap(search_text, paragraphs):
# Loop through the paragraphs and print the text content of those that contain the search text
for inx_, paragraph in enumerate(paragraphs):
text = paragraph.text
if search_text in text:
# Get the index of the matched line
# index = text.index(search_text)
index = inx_
# Print the matched line and 5 lines before and after it
start = max(0, inx_)
end = min(len(paragraphs), index + 7)
target_paragrap = paragraphs[start:end]
texts = [i.text for i in target_paragrap]
dt_lines = content_with_date(texts)
if len(dt_lines) >= 1:
return texts
return None
def titleLocInfo(title):
"""get loction and year from title"""
# print(title)
# print(driver.current_url)
# zwk_year
year_pattern = r'\d{4}年'
matches = re.findall(f'({year_pattern})', title)
zwk_year = matches[0] if len(matches) else "2023"
# zwk_sheng
area_df = cpca.transform([title])
# 省份
zwk_sheng = list(area_df["省"])[0] if area_df.shape[0] > 0 else ""
a_ = list(area_df["市"])[0] if area_df.shape[0] > 0 else ""
b_ = list(area_df["区"])[0] if area_df.shape[0] > 0 else ""
zwk_diqu = a_
# 唯一标识
zwk_zip = list(area_df["adcode"])[0] if area_df.shape[0] > 0 else ""
zwlx = zwlx_list[0] # 默认类型公务员
for i in zwlx_list:
if i in title:
zwlx = i
res = [zwk_year, zwk_sheng, zwk_diqu, zwk_zip, zwlx]
# print("\t".join(res))
return res
def extract_from_driver(driver):
""" get result from url request BeautifulSoup(texts,'html.parser')
return: doc_item ,time source info, and attach information
"""
title=driver.title
#[zwk_year, zwk_sheng, zwk_diqu, zwk_zip, zwlx]
title_info = titleLocInfo(title)
items_ = driver.find_elements_by_xpath("//p")
items_ = [i.text for i in items_ if i.text != ""]
context_to_label = "\n".join(items_)
paragraphs = driver.find_elements_by_tag_name("p")
paragraphs = [i for i in paragraphs if i.text.strip() != ""]
# extract keyword info
# key_pat["报名"], key_pat["考试"], key_pat["缴费"], key_pat["准考证"], key_pat["all"]
def get_key_info(pt:list):
for item in pt:
res_ = find_key_paragrap(item, paragraphs)
if res_ is not None:
return res_
return ""
bm_sj = get_key_info(key_pat["报名"])
fee_sj = get_key_info(key_pat["缴费"])
ks_sj = get_key_info(key_pat["考试"])
zkz_sj = get_key_info(key_pat["准考证"])
# 附件 links ".doc" or ".xls" ".xlsx"
links = driver.find_elements_by_tag_name("a")
unique_link = {}
for link in links:
url_ = link.get_attribute("href")
content_ = link.get_attribute("textContent")
url_con = url_ and (url_.endswith(".doc") or url_.endswith(".xls") or url_.endswith(".xlsx"))
name_con = content_ and (content_.endswith(".doc") or content_.endswith(".xls") or content_.endswith(".xlsx"))
if url_con or name_con:
unique_link[content_] = url_
name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx",
"bm_sj", "fee_sj", "ks_sj", "zkz_sj",
"fn_list" ,
"tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj"
]
doc_item = [title]
doc_item.extend(title_info)
doc_item.extend([bm_sj, fee_sj, ks_sj, zkz_sj,
unique_link])
td_bm_sj = content_with_date(bm_sj)
td_fee_sj = content_with_date(fee_sj)
td_ks_sj = content_with_date(ks_sj)
td_zkz_sj = content_with_date(zkz_sj)
doc_item.extend([td_bm_sj, td_fee_sj, td_ks_sj, td_zkz_sj])
doc_dc = {}
for k_, v_ in zip(name, doc_item):
doc_dc[k_] = v_
return doc_dc
# 用于表格化记录
def table_record_doc(doc):
fn_dc = doc["fn_list"]
row_item = [
doc["title"], doc["zwk_year"], doc["zwk_sheng"], doc["zwk_diqu"], doc["zwk_zip"],
doc["zwlx"],
"\n".join(doc["bm_sj"]),
"\n".join(doc["fee_sj"]),
"\n".join(doc["ks_sj"]),
"\n".join(doc["zkz_sj"]),
"\n".join(doc["tidy_bm_sj"]),
"\n".join(doc["tidy_fee_sj"]),
"\n".join(doc["tidy_ks_sj"]),
"\n".join(doc["tidy_zkz_sj"]),
"\n".join([f"{k}:{v}" for k,v in fn_dc.items()])
]
name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx",
"bm_sj", "fee_sj", "ks_sj", "zkz_sj",
"tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj",
"fn_list"]
a = pd.DataFrame(data=[row_item], columns=name)
return row_item
if __name__ == '__main__':
mydriver = getDriver()
# hm.goto(url)
url = "https://www.js.msa.gov.cn/art/2023/2/24/art_11436_1391666.html"
if len(sys.argv) > 1:
url = sys.argv[1]
hm.set_driver(mydriver) # 给它一个selnuim driver
hm.go_to(sub_url)
# mydriver.driver.
import time
time.sleep(2)
res = extract_from_driver(mydriver)
print("-raw, mostly contains----------------------------")
print(res)
print("报名,缴费,考试,准考证最相关信息")
bm_sj = doc["bm_sj"]
bm_sj = content_with_date(bm_sj)
fee_sj = content_with_date(doc["fee_sj"])
ks_sj = content_with_date(doc["ks_sj"])
zkz_sj = content_with_date(doc["zkz_sj"])
print(bm_sj)
print(fee_sj)
print(ks_sj)
print(zkz_sj)
mydriver.close()