下午好。 寫完已經2年了。
當我想要一份 Habr 的副本時,我決定編寫一個解析器,將作者的所有內容保存到數據庫中。 它是如何發生的以及我遇到了什麼錯誤 - 您可以在剪切下閱讀。
長話短說——
解析器的第一個版本。 一個線程,很多問題
首先,我決定製作一個腳本原型,在該原型中文章將在下載後立即被解析並放入數據庫中。 我想都沒想就用了sqlite3,因為。 它的勞動密集度較低:不需要本地服務器、創建-查看-刪除之類的東西。
one_thread.py
from bs4 import BeautifulSoup
import sqlite3
import requests
from datetime import datetime
def main(min, max):
conn = sqlite3.connect('habr.db')
c = conn.cursor()
c.execute('PRAGMA encoding = "UTF-8"')
c.execute("CREATE TABLE IF NOT EXISTS habr(id INT, author VARCHAR(255), title VARCHAR(255), content TEXT, tags TEXT)")
start_time = datetime.now()
c.execute("begin")
for i in range(min, max):
url = "https://m.habr.com/post/{}".format(i)
try:
r = requests.get(url)
except:
with open("req_errors.txt") as file:
file.write(i)
continue
if(r.status_code != 200):
print("{} - {}".format(i, r.status_code))
continue
html_doc = r.text
soup = BeautifulSoup(html_doc, 'html.parser')
try:
author = soup.find(class_="tm-user-info__username").get_text()
content = soup.find(id="post-content-body")
content = str(content)
title = soup.find(class_="tm-article-title__text").get_text()
tags = soup.find(class_="tm-article__tags").get_text()
tags = tags[5:]
except:
author,title,tags = "Error", "Error {}".format(r.status_code), "Error"
content = "При парсинге этой странице произошла ошибка."
c.execute('INSERT INTO habr VALUES (?, ?, ?, ?, ?)', (i, author, title, content, tags))
print(i)
c.execute("commit")
print(datetime.now() - start_time)
main(1, 490406)
一切都是經典 - 我們使用 Beautiful Soup,請求並準備好快速原型。 那隻是……
-
頁面下載在一個線程中
-
如果中斷腳本的執行,那麼整個數據庫將無處可去。 畢竟,只有在所有解析之後才會執行提交。
當然,您可以在每次插入後提交對數據庫的更改,但這樣腳本的執行時間將顯著增加。 -
解析前 100 篇文章花了我 000 個小時。
接下來我找到用戶的文章
- 使用多線程有時可以加快下載速度。
- 您可以獲得的不是 habr 的完整版本,而是其移動版本。
例如,如果桌面版本中的協整文章重 378 KB,那麼在移動版本中它已經是 126 KB。
第二個版本。 帖子較多,暫時禁止 Habr
當我在互聯網上搜索有關 python 中的多線程主題時,我選擇了最簡單的選項 multiprocessing.dummy,我注意到問題與多線程一起出現。
SQLite3 不想使用多個線程.
固定的 check_same_thread=False
,但這個錯誤並不是唯一的一個,當嘗試插入數據庫時,有時會出現我無法解決的錯誤。
因此,我決定放棄直接將文章即時插入數據庫的做法,並記住協整解決方案,我決定使用文件,因為多線程寫入文件沒有問題。
Habr 開始禁止使用超過三個線程.
特別熱衷於訪問 Habr 的嘗試可能會導致 IP 封禁幾個小時。 因此,您只需使用 3 個線程,但這已經很好了,因為迭代 100 多篇文章的時間從 26 秒減少到 12 秒。
值得注意的是,這個版本相當不穩定,大量文章的下載量會周期性下降。
async_v1.py
from bs4 import BeautifulSoup
import requests
import os, sys
import json
from multiprocessing.dummy import Pool as ThreadPool
from datetime import datetime
import logging
def worker(i):
currentFile = "files\{}.json".format(i)
if os.path.isfile(currentFile):
logging.info("{} - File exists".format(i))
return 1
url = "https://m.habr.com/post/{}".format(i)
try: r = requests.get(url)
except:
with open("req_errors.txt") as file:
file.write(i)
return 2
# Запись заблокированных запросов на сервер
if (r.status_code == 503):
with open("Error503.txt", "a") as write_file:
write_file.write(str(i) + "n")
logging.warning('{} / 503 Error'.format(i))
# Если поста не существует или он был скрыт
if (r.status_code != 200):
logging.info("{} / {} Code".format(i, r.status_code))
return r.status_code
html_doc = r.text
soup = BeautifulSoup(html_doc, 'html5lib')
try:
author = soup.find(class_="tm-user-info__username").get_text()
timestamp = soup.find(class_='tm-user-meta__date')
timestamp = timestamp['title']
content = soup.find(id="post-content-body")
content = str(content)
title = soup.find(class_="tm-article-title__text").get_text()
tags = soup.find(class_="tm-article__tags").get_text()
tags = tags[5:]
# Метка, что пост является переводом или туториалом.
tm_tag = soup.find(class_="tm-tags tm-tags_post").get_text()
rating = soup.find(class_="tm-votes-score").get_text()
except:
author = title = tags = timestamp = tm_tag = rating = "Error"
content = "При парсинге этой странице произошла ошибка."
logging.warning("Error parsing - {}".format(i))
with open("Errors.txt", "a") as write_file:
write_file.write(str(i) + "n")
# Записываем статью в json
try:
article = [i, timestamp, author, title, content, tm_tag, rating, tags]
with open(currentFile, "w") as write_file:
json.dump(article, write_file)
except:
print(i)
raise
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Необходимы параметры min и max. Использование: async_v1.py 1 100")
sys.exit(1)
min = int(sys.argv[1])
max = int(sys.argv[2])
# Если потоков >3
# то хабр банит ipшник на время
pool = ThreadPool(3)
# Отсчет времени, запуск потоков
start_time = datetime.now()
results = pool.map(worker, range(min, max))
# После закрытия всех потоков печатаем время
pool.close()
pool.join()
print(datetime.now() - start_time)
第三版。 最終的
在調試第二個版本時,我發現 Habr 突然有一個可供移動版本網站訪問的 API。 它的加載速度比移動版本更快,因為它只是 json,甚至不需要解析。 最後,我決定再次重寫我的劇本。
所以,發現
async_v2.py
import requests
import os, sys
import json
from multiprocessing.dummy import Pool as ThreadPool
from datetime import datetime
import logging
def worker(i):
currentFile = "files\{}.json".format(i)
if os.path.isfile(currentFile):
logging.info("{} - File exists".format(i))
return 1
url = "https://m.habr.com/kek/v1/articles/{}/?fl=ru%2Cen&hl=ru".format(i)
try:
r = requests.get(url)
if r.status_code == 503:
logging.critical("503 Error")
return 503
except:
with open("req_errors.txt") as file:
file.write(i)
return 2
data = json.loads(r.text)
if data['success']:
article = data['data']['article']
id = article['id']
is_tutorial = article['is_tutorial']
time_published = article['time_published']
comments_count = article['comments_count']
lang = article['lang']
tags_string = article['tags_string']
title = article['title']
content = article['text_html']
reading_count = article['reading_count']
author = article['author']['login']
score = article['voting']['score']
data = (id, is_tutorial, time_published, title, content, comments_count, lang, tags_string, reading_count, author, score)
with open(currentFile, "w") as write_file:
json.dump(data, write_file)
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Необходимы параметры min и max. Использование: asyc.py 1 100")
sys.exit(1)
min = int(sys.argv[1])
max = int(sys.argv[2])
# Если потоков >3
# то хабр банит ipшник на время
pool = ThreadPool(3)
# Отсчет времени, запуск потоков
start_time = datetime.now()
results = pool.map(worker, range(min, max))
# После закрытия всех потоков печатаем время
pool.close()
pool.join()
print(datetime.now() - start_time)
它包含與文章本身和撰寫文章的作者相關的字段。
API.png
我沒有轉儲每篇文章的完整 json,而是只保存了我需要的字段:
- id
- is_教程
- 發佈時間
- 標題
- 內容
- 評論數
- lang 是撰寫文章所用的語言。 到目前為止,只有 en 和 ru。
- Tags_string - 帖子中的所有標籤
- 閱讀次數
- 作者
- 分數——文章評分。
因此,使用 API,我將腳本執行時間減少到每 8 個 url 100 秒。
下載完所需的數據後,我們需要對其進行處理並將其輸入數據庫。 我對此也沒有任何問題:
解析器.py
import json
import sqlite3
import logging
from datetime import datetime
def parser(min, max):
conn = sqlite3.connect('habr.db')
c = conn.cursor()
c.execute('PRAGMA encoding = "UTF-8"')
c.execute('PRAGMA synchronous = 0') # Отключаем подтверждение записи, так скорость увеличивается в разы.
c.execute("CREATE TABLE IF NOT EXISTS articles(id INTEGER, time_published TEXT, author TEXT, title TEXT, content TEXT,
lang TEXT, comments_count INTEGER, reading_count INTEGER, score INTEGER, is_tutorial INTEGER, tags_string TEXT)")
try:
for i in range(min, max):
try:
filename = "files\{}.json".format(i)
f = open(filename)
data = json.load(f)
(id, is_tutorial, time_published, title, content, comments_count, lang,
tags_string, reading_count, author, score) = data
# Ради лучшей читаемости базы можно пренебречь читаемостью кода. Или нет?
# Если вам так кажется, можно просто заменить кортеж аргументом data. Решать вам.
c.execute('INSERT INTO articles VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (id, time_published, author,
title, content, lang,
comments_count, reading_count,
score, is_tutorial,
tags_string))
f.close()
except IOError:
logging.info('FileNotExists')
continue
finally:
conn.commit()
start_time = datetime.now()
parser(490000, 490918)
print(datetime.now() - start_time)
統計
好吧,傳統上,最後,您可以從數據中提取一些統計信息:
- 在預期的 490 次下載中,只有 406 篇文章被下載。 事實證明,超過一半(228)篇關於 Habré 的文章被隱藏或刪除。
- 整個數據庫包含近 2.95 萬篇文章,大小為 495 GB。 壓縮形式 - XNUMX MB。
- 共有 37804 人是《哈布雷》的作者。 我提醒您,這些統計數據僅來自實時帖子。
- 關於哈布雷最多產的作者 -
阿利薩 - 8774 篇文章。 評分最高的文章 — 1448 優點最常閱讀的文章 — 1660841 次觀看討論最多的文章 — 2444 條評論
嗯,以上衣的形式前 15 位作者
按評級排名前 15 名
閱讀前 15 名
討論最多的 15 名
來源: www.habr.com