下午好。 写了2年了。
当我想要一份 Habr 的副本时,我决定编写一个解析器,将作者的所有内容保存到数据库中。 它是如何发生的以及我遇到了什么错误 - 你可以在下面阅读。
长话短说——
解析器的第一个版本。 一个线程,很多问题
首先,我决定制作一个脚本原型,在该原型中,文章将在下载后立即被解析并放入数据库中。 想都没想就用了sqlite3,因为。 它的劳动强度较低:不需要本地服务器、创建-查看-删除之类的东西。
一个线程.py
from bs4 import BeautifulSoup
import sqlite3
import requests
from datetime import datetime
def main(min, max):
conn = sqlite3.connect('habr.db')
c = conn.cursor()
c.execute('PRAGMA encoding = "UTF-8"')
c.execute("CREATE TABLE IF NOT EXISTS habr(id INT, author VARCHAR(255), title VARCHAR(255), content TEXT, tags TEXT)")
start_time = datetime.now()
c.execute("begin")
for i in range(min, max):
url = "https://m.habr.com/post/{}".format(i)
try:
r = requests.get(url)
except:
with open("req_errors.txt") as file:
file.write(i)
continue
if(r.status_code != 200):
print("{} - {}".format(i, r.status_code))
continue
html_doc = r.text
soup = BeautifulSoup(html_doc, 'html.parser')
try:
author = soup.find(class_="tm-user-info__username").get_text()
content = soup.find(id="post-content-body")
content = str(content)
title = soup.find(class_="tm-article-title__text").get_text()
tags = soup.find(class_="tm-article__tags").get_text()
tags = tags[5:]
except:
author,title,tags = "Error", "Error {}".format(r.status_code), "Error"
content = "При парсинге этой странице произошла ошибка."
c.execute('INSERT INTO habr VALUES (?, ?, ?, ?, ?)', (i, author, title, content, tags))
print(i)
c.execute("commit")
print(datetime.now() - start_time)
main(1, 490406)
一切都是经典的——我们使用 Beautiful Soup,请求和快速原型准备就绪。 那只是……
-
页面下载在一个线程中
-
如果中断脚本的执行,那么整个数据库将无处可去。 毕竟,只有在所有解析之后才执行提交。
当然,您可以在每次插入后提交对数据库的更改,但这样脚本执行时间会显着增加。 -
解析前 100 篇文章花了我 000 个小时。
接下来我找到用户的文章
- 使用多线程有时会加快下载速度。
- 您可以获得的不是 habr 的完整版本,而是它的移动版本。
例如,如果一篇协整文章在桌面版中有 378 KB,那么在移动版中它已经是 126 KB。
第二个版本。 许多线程,Habr 的临时禁令
当我在网上搜索关于 python 多线程的主题时,我选择了最简单的 multiprocessing.dummy 选项,我注意到问题随着多线程一起出现。
SQLite3 不想使用多个线程.
固定的 check_same_thread=False
,但这个错误不是唯一的,当尝试插入数据库时,有时会出现我无法解决的错误。
因此,我决定放弃文章直接插入数据库的即时插入,想起协整的方案,我决定使用文件,因为多线程写入文件是没有问题的。
Habr 开始禁止使用三个以上的线程.
特别热心地尝试连接到 Habr 可能会以几个小时的 ip 禁令告终。 所以你必须只使用 3 个线程,但这已经很好了,因为迭代 100 篇文章的时间从 26 秒减少到 12 秒。
值得注意的是,这个版本相当不稳定,大量文章的下载量周期性下降。
async_v1.py
from bs4 import BeautifulSoup
import requests
import os, sys
import json
from multiprocessing.dummy import Pool as ThreadPool
from datetime import datetime
import logging
def worker(i):
currentFile = "files\{}.json".format(i)
if os.path.isfile(currentFile):
logging.info("{} - File exists".format(i))
return 1
url = "https://m.habr.com/post/{}".format(i)
try: r = requests.get(url)
except:
with open("req_errors.txt") as file:
file.write(i)
return 2
# Запись заблокированных запросов на сервер
if (r.status_code == 503):
with open("Error503.txt", "a") as write_file:
write_file.write(str(i) + "n")
logging.warning('{} / 503 Error'.format(i))
# Если поста не существует или он был скрыт
if (r.status_code != 200):
logging.info("{} / {} Code".format(i, r.status_code))
return r.status_code
html_doc = r.text
soup = BeautifulSoup(html_doc, 'html5lib')
try:
author = soup.find(class_="tm-user-info__username").get_text()
timestamp = soup.find(class_='tm-user-meta__date')
timestamp = timestamp['title']
content = soup.find(id="post-content-body")
content = str(content)
title = soup.find(class_="tm-article-title__text").get_text()
tags = soup.find(class_="tm-article__tags").get_text()
tags = tags[5:]
# Метка, что пост является переводом или туториалом.
tm_tag = soup.find(class_="tm-tags tm-tags_post").get_text()
rating = soup.find(class_="tm-votes-score").get_text()
except:
author = title = tags = timestamp = tm_tag = rating = "Error"
content = "При парсинге этой странице произошла ошибка."
logging.warning("Error parsing - {}".format(i))
with open("Errors.txt", "a") as write_file:
write_file.write(str(i) + "n")
# Записываем статью в json
try:
article = [i, timestamp, author, title, content, tm_tag, rating, tags]
with open(currentFile, "w") as write_file:
json.dump(article, write_file)
except:
print(i)
raise
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Необходимы параметры min и max. Использование: async_v1.py 1 100")
sys.exit(1)
min = int(sys.argv[1])
max = int(sys.argv[2])
# Если потоков >3
# то хабр банит ipшник на время
pool = ThreadPool(3)
# Отсчет времени, запуск потоков
start_time = datetime.now()
results = pool.map(worker, range(min, max))
# После закрытия всех потоков печатаем время
pool.close()
pool.join()
print(datetime.now() - start_time)
第三个版本。 最终的
在调试第二个版本时,我发现 Habr 突然有了一个 API,移动版网站可以访问该 API。 它加载速度比移动版本快,因为它只是 json,甚至不需要解析。 最后,我决定再次重写我的剧本。
所以,发现
async_v2.py
import requests
import os, sys
import json
from multiprocessing.dummy import Pool as ThreadPool
from datetime import datetime
import logging
def worker(i):
currentFile = "files\{}.json".format(i)
if os.path.isfile(currentFile):
logging.info("{} - File exists".format(i))
return 1
url = "https://m.habr.com/kek/v1/articles/{}/?fl=ru%2Cen&hl=ru".format(i)
try:
r = requests.get(url)
if r.status_code == 503:
logging.critical("503 Error")
return 503
except:
with open("req_errors.txt") as file:
file.write(i)
return 2
data = json.loads(r.text)
if data['success']:
article = data['data']['article']
id = article['id']
is_tutorial = article['is_tutorial']
time_published = article['time_published']
comments_count = article['comments_count']
lang = article['lang']
tags_string = article['tags_string']
title = article['title']
content = article['text_html']
reading_count = article['reading_count']
author = article['author']['login']
score = article['voting']['score']
data = (id, is_tutorial, time_published, title, content, comments_count, lang, tags_string, reading_count, author, score)
with open(currentFile, "w") as write_file:
json.dump(data, write_file)
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Необходимы параметры min и max. Использование: asyc.py 1 100")
sys.exit(1)
min = int(sys.argv[1])
max = int(sys.argv[2])
# Если потоков >3
# то хабр банит ipшник на время
pool = ThreadPool(3)
# Отсчет времени, запуск потоков
start_time = datetime.now()
results = pool.map(worker, range(min, max))
# После закрытия всех потоков печатаем время
pool.close()
pool.join()
print(datetime.now() - start_time)
它包含与文章本身和撰写文章的作者相关的字段。
接口.png
我没有转储每篇文章的完整 json,而是只保存了我需要的字段:
- id
- is_教程
- 发布时间
- 标题
- 内容
- 评论数
- lang 是文章所用的语言。 到目前为止,它只有 en 和 ru。
- tags_string - 帖子中的所有标签
- 阅读次数
- 作者
- score——文章评分。
因此,使用 API,我将脚本执行时间减少到每 8 个 url 100 秒。
当我们下载完我们需要的数据后,我们需要对其进行处理,并将其输入到数据库中。 我对此也没有任何问题:
解析器.py
import json
import sqlite3
import logging
from datetime import datetime
def parser(min, max):
conn = sqlite3.connect('habr.db')
c = conn.cursor()
c.execute('PRAGMA encoding = "UTF-8"')
c.execute('PRAGMA synchronous = 0') # Отключаем подтверждение записи, так скорость увеличивается в разы.
c.execute("CREATE TABLE IF NOT EXISTS articles(id INTEGER, time_published TEXT, author TEXT, title TEXT, content TEXT,
lang TEXT, comments_count INTEGER, reading_count INTEGER, score INTEGER, is_tutorial INTEGER, tags_string TEXT)")
try:
for i in range(min, max):
try:
filename = "files\{}.json".format(i)
f = open(filename)
data = json.load(f)
(id, is_tutorial, time_published, title, content, comments_count, lang,
tags_string, reading_count, author, score) = data
# Ради лучшей читаемости базы можно пренебречь читаемостью кода. Или нет?
# Если вам так кажется, можно просто заменить кортеж аргументом data. Решать вам.
c.execute('INSERT INTO articles VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (id, time_published, author,
title, content, lang,
comments_count, reading_count,
score, is_tutorial,
tags_string))
f.close()
except IOError:
logging.info('FileNotExists')
continue
finally:
conn.commit()
start_time = datetime.now()
parser(490000, 490918)
print(datetime.now() - start_time)
统计
好吧,传统上,最后,您可以从数据中提取一些统计信息:
- 在预期的 490 次下载中,仅下载了 406 篇文章。 事实证明,超过一半(228)篇关于哈布雷的文章被隐藏或删除。
- 整个数据库由近 2.95 万篇文章组成,重达 495 GB。 压缩形式 - XNUMX MB。
- 总共有 37804 人是哈布雷的作者。 我提醒您,这些统计数据仅来自实时帖子。
- 关于哈布雷的最多产作家——
阿利萨 - 8774 篇文章。 评分最高的文章 — 1448 加号最常阅读的文章 — 1660841 次观看讨论最多的文章 — 2444 条评论
好吧,以上衣的形式前 15 位作者
评分前 15 名
阅读前 15 名
前 15 名讨论
来源: habr.com