声明:本项目仅供学习参考使用,请勿做任何商业化乃至违法行为,请勿大量请求,以免给服务器带来不必要的负担。
废话不多说,先上结果:
前言:
小红书作为一个以种草、分享生活方式、商品评测为主的社区平台,日常使用用户多且内容丰富,因此也常常成为数据采集工作的重要目标。然而,小红书在反爬虫方面做了不少功夫,如果只是通过普通的requests
库发送请求,往往会遇到各种报错或“疑似异常流量”而被禁止访问。
本文将为你详细介绍一套Python 实现 的小红书爬虫思路,包括在抓包时发现的接口信息、必需的加密参数生成、Cookies 的获取和切换、数据的抓取与持久化、图片下载等环节。
最终,你可以通过该示例一次性爬取多个用户的笔记列表和详情信息,并将数据存储到 CSV 文件,甚至还能根据需求把笔记中的图片也一起下载到本地。
一、项目背景与思路梳理
1.1 为什么要逆向?
许多网站(尤其是移动端 / H5 服务)为了提高安全性,会对其接口做各种加密或混淆处理,以防止未经授权的爬虫大规模采集数据。具体做法包括但不限于:
- 请求头和参数的加密签名:没有正确的签名就会返回
{"success": false}
或者被识别为异常流量。 - 限制访问频率:同一个 IP、同一组 Cookies,短时间内触发过多请求,就会被封禁或触发验证码。
- JS 加密:在前端网页中,通过各种方式对请求参数或关键字段进行加密处理。
在本文中,我们需要的关键参数有:
x-s
、x-t
:小红书的核心加密签名,用于鉴权。xsec_token
:当获取用户笔记列表或单条笔记详情时,需要携带该参数来验证。- 以及一系列从 Cookies 中获取的
a1
、web_session
等信息。
1.2 核心流程
概括而言,整个爬虫流程可以拆解为以下几个阶段:
-
准备阶段
- 获取或准备多份小红书 Cookies(可通过浏览器手动登录并抓包获取,也可以通过手机号 / 短信登录后拿到)。由于访问小红书接口容易触发频率限制,一旦某一份 Cookie 超过访问阈值,需要切换到下一份 Cookies 继续爬取。
- 获取 JS 文件(如1
.js
),内部包含了小红书签名生成所需的核心函数getXs
,我们将借助execjs
在 Python 环境中调用它。
-
读取博主 ID
- 通过
pandas
从 Excel 表格或 CSV 文件中读取每位博主的 user_id。
- 通过
-
循环爬取:获取博主笔记列表
- 针对每个博主,调用
GET /api/sns/web/v1/user_posted
接口,获取其笔记列表。 - 需要进行分页(通过
cursor
实现翻页),直到has_more
为 False 或者没有返回cursor
,说明没有更多笔记。
- 针对每个博主,调用
-
获取单条笔记详情
- 对于获取到的每一条笔记,需要二次请求
POST /api/sns/web/v1/feed
接口,获得更详细的数据,如:笔记文本、标签、点赞收藏数、评论数、博主 IP 位置、图片列表等。
- 对于获取到的每一条笔记,需要二次请求
-
数据解析与持久化
- 将笔记 ID、笔记标题、笔记标签、发布时间、博主信息、互动数量等保存到 CSV(或Excel)文件中。
- 如果需要的话,可调用下载图片的函数,将笔记中的图片也保存到本地文件夹。
-
频率限制与 Cookies 切换
- 如果请求过于频繁且同一份 Cookies 反复使用,就可能出现
success == false
或者 461等错误。 - 这时需要在当前页面(或下一次请求)切换到下一组 Cookies,尽量减少被封禁的概率。
- 同时可以添加一定的请求延迟(如
time.sleep(0.5)
)让访问更自然。
- 如果请求过于频繁且同一份 Cookies 反复使用,就可能出现
二、环境与依赖
- Python 3.x
- requests:发送 HTTP 请求
- execjs:用 Python 执行 JS
- loguru:更优雅的日志输出
- pandas:处理 Excel / CSV 数据
- openpyxl:如果需要读取 Excel xlsx 文件,需安装此库
- re、json、time、datetime、random:Python 内置模块
在使用前,请先安装以下核心库:
pip install requests execjs loguru pandas openpyxl
并将逆向得到的小红书签名 JS 文件放在脚本同级目录下。
三、项目中的关键代码详解
下面的示例来自一个可正常执行并获取小红书博主笔记信息的爬虫脚本。我们将分几个功能函数为你一一解释实现原理与示例代码。最后会把所有函数组装到一起,形成一个完整可运行的脚本。
3.1 生成签名:update_headers
函数
在小红书的接口中,x-s
、x-t
这两个字段是最关键的签名参数。如果不带或不正确,就会出现 {"success": false}
等错误。
通过前期的 JS 逆向 或 抓包 分析,我们可以在网页中找到类似 getXs
的函数,专门用于生成 x-s
、x-t
。通过将其提取成单独的 JavaScript 文件,我们在 Python 中调用它即可。
def update_headers(api, data, current_cookies):
"""
使用 execjs 执行 JS 文件生成 x-s, x-t 签名
:param api: 接口路径,示例:'/api/sns/web/v1/feed' 或带有参数的全路径
:param data: 请求 body 或者需要参与签名的字段,可为 None
:param current_cookies: 当前使用的 Cookies(提取 a1)
:return: 包含 'X-s', 'X-t' 的字典
"""
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
# 第三个参数是 a1 值,在 cookies 中
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
这个函数的工作流程很简单:
- 读取并编译 JS 文件
- 调用其中的
getXs
方法,传入接口路径、请求数据、以及 cookies 里的a1
。 - 返回一个字典,含有
{'X-s': 'xxx', 'X-t': 123456789}
。
拿到签名后,我们就可以把它放到请求头里去发送请求了。
3.2 请求用户笔记列表:main
函数(核心循环)
为了实现批量爬取,我们通常会从一个文件(如 博主id.xlsx
)中读取许多用户 ID (user_id
),然后挨个请求 GET /api/sns/web/v1/user_posted
接口,拿到该用户的所有笔记。
以下示例展示了主函数 main
中的关键逻辑:
def main(file_path, file_name, cookies_list, is_download=False):
# 1. 读取用户ID
data = pd.read_excel(file_path) # 或 pd.read_csv
user_ids = data['用户id'] # 假定excel中有一列叫"用户id"
# 2. 小红书的接口地址
url = "https://edith.xiaohongshu/api/sns/web/v1/user_posted"
person_index = 1
# 3. 遍历每个 user_id,依次爬取笔记列表
for user_id in user_ids:
has_more = True
print(f'正在爬取第{person_index}个人 {user_id} 的帖子信息')
# 3.1 初始化请求参数
params = {
"num": "30", # 一页返回多少条
"cursor": "", # 分页游标
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "", # 预留字段
"xsec_source": "pc_note"
}
k = 0
# 3.2 当 has_more 为 True 就不断分页请求
while has_more:
current_cookie_index = 0
# 3.3 如果遇到访问异常,则切换cookies
while current_cookie_index < len(cookies_list):
current_cookies = cookies_list[current_cookie_index]
# 拼接参数并复制初始headers
params_encoded = urllib.parse.urlencode(params)
headers = headers_init.copy()
# 生成 x-s、x-t 并写入到 headers 中
sign_headers = update_headers(
f'/api/sns/web/v1/user_posted?{params_encoded}', # 接口带查询参数
None,
current_cookies
)
headers['x-s'] = sign_headers['X-s']
headers['x-t'] = str(sign_headers['X-t'])
# 3.4 发起请求
response1 = requests.get(url, headers=headers, cookies=current_cookies, params=params)
print(response1.status_code)
# 3.5 如果状态码200且 success=true,则表示正常返回
if response1.status_code == 200 and response1.json().get('success') == True:
data_json = response1.json()
# 解析下一页所需的 cursor、has_more
notes = data_json.get('data', {}).get('notes', [])
has_more = data_json.get('data', {}).get('has_more', False)
cursor = data_json.get('data', {}).get('cursor', None)
if cursor:
params['cursor'] = cursor
logger.info(f"成功更新 cursor 为: {cursor}")
else:
has_more = False
logger.info("未返回 cursor,结束分页")
# 3.6 遍历当前页所有笔记
for note in notes:
logger.info(f'正在爬取第{person_index}个人的第{k}个帖子')
k += 1
xsec_token = note.get('xsec_token')
note_id = note.get('note_id')
# 获取笔记详情
note_data, status_code_result, headers_result = fetch_xiaohongshu_data(
note_id, xsec_token, current_cookies
)
# 如果 success=False,说明被识别为频率异常,需要切换Cookies
if status_code_result == 200 and note_data.get('success') == False:
current_cookie_index += 1
print('出现访问频率异常,切换下一个cookies,跳出当前页笔记循环')
k -= 1 # 回退一下计数,因为这一条笔记没有成功获取
break
else:
# 如果获取成功,则解析并保存
if status_code_result == 200 and note_data.get('success') == True:
note_id, user_id_1 = parse_data(note_data, file_name, xsec_token)
# 如果需要下载笔记内图片
if is_download == True:
download_img(note_data, user_id_1, note_id)
else:
logger.error('请求失败,跳过')
current_cookie_index += 1
# 如果这一页爬完并且没有更多,就退出
if has_more == False:
logger.info(f'用户{user_id}的笔记已经爬完')
break
else:
# 若请求不成功,切换Cookies重试
logger.info('------------------------------------')
logger.info('请求失败,切换下一个 cookies')
logger.info('------------------------------------')
current_cookie_index += 1
person_index += 1
logger.info("所有用户数据处理完毕")
需要注意:
- 若请求过于频繁,或某一 Cookie 被判定为异常,则需要切换到
cookies_list
中的下一个 Cookie。- 由于小红书接口的复杂性,建议你在实际部署时,也要考虑代理池的使用,避免 IP 被拉黑。
- 请不要大规模爬取,以免给服务器带来不必要的负担。
3.3 获取单条笔记详情:fetch_xiaohongshu_data
函数
用户笔记列表接口虽然能拿到很多基础信息,但并不够全面。通常我们还需要调用单条笔记的详情接口进行爬取,例如 POST /api/sns/web/v1/feed
。该接口需要在请求体中传递 source_note_id
、xsec_token
、xsec_source
等,并再次生成签名:
def fetch_xiaohongshu_data(source_note_id, xsec_token, cookies):
url = "https://edith.xiaohongshu/api/sns/web/v1/feed"
api_endpoint = '/api/sns/web/v1/feed'
# 请求体
data = {
"source_note_id": source_note_id,
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": "1"},
"xsec_source": "pc_feed",
"xsec_token": xsec_token
}
a1_value = cookies['a1']
headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "https://www.xiaohongshu/",
"sec-ch-ua": "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"126\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)",
"x-b3-traceid": "9cbac8e2b8562aa3"
}
# 与上面相同,再次通过 JS 文件生成 x-s, x-t
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api_endpoint, data, a1_value)
headers['x-s'] = sign['X-s']
headers['x-t'] = str(sign['X-t'])
# 发起请求
data_json = json.dumps(data, separators=(',', ':'))
time.sleep(0.5) # 做一个小延时,减小被识别为爬虫的概率
response = requests.post(url, headers=headers, cookies=cookies, data=data_json)
return response.json(), response.status_code, response.headers
调用此函数后,我们会得到一个 JSON 响应,其中包含了笔记正文、图片信息、点赞收藏数、评论数、IP 所属地等。
3.4 解析并保存数据:parse_data
函数
拿到笔记详情后,就可以对响应进行解析,并将关键字段保存到本地 CSV 文件中了。例如:
def parse_data(data, filename='博主信息', xsec_token=0):
fieldnames = [
"笔记id", "xsec_token", "笔记链接", "笔记类型", "笔记标题",
"笔记正文", "笔记标签", "发布时间", "笔记最后更新时间", "图片链接",
"点赞数", "收藏数", "评论数", "分享数", "用户名",
"用户id", "用户ip", "用户头像"
]
file_name = f"{filename}.csv"
file_exists = os.path.isfile(file_name)
with open(file_name, mode='a', newline='', encoding='utf-8-sig') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
item = data['data']['items'][0] # 取到 note 数据
note_card = item['note_card']
note_id = item["id"]
note_url = f'https://www.xiaohongshu/explore/{note_id}?xsec_token={xsec_token}&xsec_source=pc_feed'
note_type = item["model_type"]
desc = note_card.get("desc", "")
# 提取 #标签
tags = re.findall(r"#([^#]+?)(?=\[话题\])", desc)
tags = ", ".join(tags)
# 互动信息
interact_info = note_card.get("interact_info", {})
publish_time = note_card.get("time")
title = note_card.get("title", "")
user_info = note_card.get("user", {})
user_avatar = user_info.get("avatar", "")
user_name = user_info.get('nickname', '')
user_id = user_info.get('user_id', '')
last_updated_time = note_card.get("last_update_time", "")
like_count = interact_info.get("liked_count", 0)
collect_count = interact_info.get("collected_count", 0)
comment_count = interact_info.get("comment_count", 0)
share_count = interact_info.get("share_count", 0)
ip = note_card.get("ip_location", "")
# 笔记首图
image_url = ""
if note_card.get("image_list"):
image_url = note_card["image_list"][0]["info_list"][0]["url"]
# 写入CSV
writer.writerow({
"笔记id": note_id,
"xsec_token": xsec_token,
"笔记链接": note_url,
"笔记类型": note_type,
"笔记标题": title,
"笔记正文": desc,
"笔记标签": tags,
"发布时间": convert_scientific_time(publish_time),
"笔记最后更新时间": convert_scientific_time(last_updated_time),
"图片链接": image_url,
"点赞数": convert_to_int(like_count),
"收藏数": convert_to_int(collect_count),
"评论数": convert_to_int(comment_count),
"分享数": convert_to_int(share_count),
"用户名": user_name,
"用户id": user_id,
"用户ip": ip,
"用户头像": user_avatar
})
return note_id, user_id
这里做了几个小处理:
- 正则匹配话题:用
re.findall(r"#([^#]+?)(?=\[话题\])", desc)
来找出笔记中以#
开头且紧跟[话题]
的标签。 - 时间转换:小红书返回的时间戳往往是毫秒级,需要除 1000 并再使用
datetime.fromtimestamp
转换成人类可读的格式。 - “万” 字符处理:当点赞量或评论量超过一万,小红书会显示类似 “2.3万”,这时要换算回整数值保存。
def convert_scientific_time(time_str):
timestamp = float(time_str)
timestamp_in_seconds = timestamp / 1000
readable_time = datetime.fromtimestamp(timestamp_in_seconds).strftime('%Y年%m月%d日 %H:%M:%S')
return readable_time
def convert_to_int(value):
if '万' in value:
value = value.replace('万', '')
return float(value) * 10000
else:
return value
3.5 下载图片:download_img
函数(可选)
如果业务需要存储图片,可以在爬取到笔记详情时,将 note_card["image_list"]
中的图片地址遍历下载下来:
def download_img(data, user_id, note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = os.path.join("images", user_id, note_id)
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
try:
response = requests.get(url)
if response.status_code == 200:
file_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
with open(file_path, "wb") as f:
f.write(response.content)
logger.info(f"图片下载成功: {file_path}")
except Exception as e:
logger.error(f"图片下载出错: {e}")
这样就能在 images/用户id/笔记id
文件夹下存储下载到的所有图片。
3.6 完整脚本示例
将以上函数整合,存放到同一个 Python 文件(如 xiaohongshu_spider.py
)即可。请在运行前,先确认你拥有 GenXsAndCommon_56.js
、博主id.xlsx
等文件,并且手动更新了 cookies_list
中的 Cookies。示例:
import time
import execjs
import urllib.parse
from loguru import logger
import pandas as pd
import os
import requests
import re
import csv
from datetime import datetime
import json
import random
def base36encode(number, digits='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
base36 = ""
while number:
number, i = divmod(number, 36)
base36 = digits[i] + base36
return base36.lower()
def generate_search_id():
timestamp = int(time.time() * 1000) << 64
random_value = int(random.uniform(0, 2147483646))
return base36encode(timestamp + random_value)
def convert_scientific_time(time_str):
timestamp = float(time_str)
timestamp_in_seconds = timestamp / 1000
readable_time = datetime.fromtimestamp(timestamp_in_seconds).strftime('%Y年%m月%d日 %H:%M:%S')
return readable_time
def convert_to_int(value):
if '万' in value:
value = value.replace('万', '')
return float(value) * 10000
else:
return value
def update_headers(api, data, current_cookies):
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
headers_init = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"origin": "https://www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "https://www.xiaohongshu/",
"sec-ch-ua": "\"Google Chrome\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"x-b3-traceid": "9d6c7dd3a155b6a5"
}
def fetch_xiaohongshu_data(source_note_id, xsec_token, cookies):
url = "https://edith.xiaohongshu/api/sns/web/v1/feed"
api_endpoint = '/api/sns/web/v1/feed'
data = {
"source_note_id": source_note_id,
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": "1"},
"xsec_source": "pc_feed",
"xsec_token": xsec_token
}
a1_value = cookies['a1']
headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "https://www.xiaohongshu/",
"sec-ch-ua": "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"126\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"x-b3-traceid": "9cbac8e2b8562aa3"
}
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api_endpoint, data, a1_value)
headers['x-s'] = sign['X-s']
headers['x-t'] = str(sign['X-t'])
data_json = json.dumps(data, separators=(',', ':'))
time.sleep(0.5)
response = requests.post(url, headers=headers, cookies=cookies, data=data_json)
return response.json(), response.status_code, response.headers
def parse_data(data, filename='博主信息', xsec_token=0):
fieldnames = [
"笔记id", "xsec_token", "笔记链接", "笔记类型", "笔记标题",
"笔记正文", "笔记标签", "发布时间", "笔记最后更新时间", "图片链接",
"点赞数", "收藏数", "评论数", "分享数", "用户名",
"用户id", "用户ip", "用户头像"
]
file_name = f"{filename}.csv"
file_exists = os.path.isfile(file_name)
with open(file_name, mode='a', newline='', encoding='utf-8-sig') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
item = data['data']['items'][0]
note_card = item['note_card']
note_id = item["id"]
note_url = f'https://www.xiaohongshu/explore/{note_id}?xsec_token={xsec_token}&xsec_source=pc_feed'
note_type = item["model_type"]
desc = note_card.get("desc", "")
tags = re.findall(r"#([^#]+?)(?=\[话题\])", desc)
tags = ", ".join(tags)
interact_info = note_card.get("interact_info", {})
publish_time = note_card.get("time")
title = note_card.get("title", "")
user_info = note_card.get("user", {})
user_avatar = user_info.get("avatar", "")
user_name = user_info.get('nickname', '')
user_id = user_info.get('user_id', '')
last_updated_time = note_card.get("last_update_time", "")
like_count = interact_info.get("liked_count", 0)
collect_count = interact_info.get("collected_count", 0)
comment_count = interact_info.get("comment_count", 0)
share_count = interact_info.get("share_count", 0)
ip = note_card.get("ip_location", "")
image_url = ""
if note_card.get("image_list"):
image_url = note_card["image_list"][0]["info_list"][0]["url"]
writer.writerow({
"笔记id": note_id,
"xsec_token": xsec_token,
"笔记链接": note_url,
"笔记类型": note_type,
"笔记标题": title,
"笔记正文": desc,
"笔记标签": tags,
"发布时间": convert_scientific_time(publish_time),
"笔记最后更新时间": convert_scientific_time(last_updated_time),
"图片链接": image_url,
"点赞数": convert_to_int(like_count),
"收藏数": convert_to_int(collect_count),
"评论数": convert_to_int(comment_count),
"分享数": convert_to_int(share_count),
"用户名": user_name,
"用户id": user_id,
"用户ip": ip,
"用户头像": user_avatar
})
return note_id, user_id
def download_img(data, user_id, note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = os.path.join("images", user_id, note_id)
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
try:
response = requests.get(url)
if response.status_code == 200:
file_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
with open(file_path, "wb") as f:
f.write(response.content)
logger.info(f"图片下载成功: {file_path}")
except Exception as e:
logger.error(f"图片下载出错: {e}")
def main(file_path, file_name, cookies_list, is_download=False):
data = pd.read_excel(file_path)
user_ids = data['用户id']
url = "https://edith.xiaohongshu/api/sns/web/v1/user_posted"
person_index = 1
for user_id in user_ids:
has_more = True
print(f'正在爬取第{person_index}个人 {user_id} 的帖子信息')
params = {
"num": "30",
"cursor": "",
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "",
"xsec_source": "pc_note"
}
k = 0
while has_more:
current_cookie_index = 0
while current_cookie_index < len(cookies_list):
current_cookies = cookies_list[current_cookie_index]
params_encoded = urllib.parse.urlencode(params)
headers = headers_init.copy()
sign_headers = update_headers(
f'/api/sns/web/v1/user_posted?{params_encoded}', None, current_cookies
)
headers['x-s'] = sign_headers['X-s']
headers['x-t'] = str(sign_headers['X-t'])
response1 = requests.get(url, headers=headers, cookies=current_cookies, params=params)
print(response1.status_code)
if response1.status_code == 200 and response1.json().get('success') == True:
data_json = response1.json()
notes = data_json.get('data', {}).get('notes', [])
has_more = data_json.get('data', {}).get('has_more', False)
cursor = data_json.get('data', {}).get('cursor', None)
if cursor:
params['cursor'] = cursor
logger.info(f"成功更新 cursor 为: {cursor}")
else:
has_more = False
logger.info("未返回 cursor,结束分页")
for note in notes:
logger.info(f'正在爬取第{person_index}个人的第{k}个帖子')
k += 1
xsec_token = note.get('xsec_token')
note_id = note.get('note_id')
note_data, status_code_result, headers_result = fetch_xiaohongshu_data(
note_id, xsec_token, current_cookies
)
if status_code_result == 200 and note_data.get('success') == False:
current_cookie_index += 1
print('出现频率访问异常,切换下一个cookies,跳出当前页笔记')
k -= 1
break
else:
pass
if status_code_result == 200 and note_data.get('success') == True:
note_id, user_id_1 = parse_data(note_data, file_name, xsec_token)
if is_download == True:
download_img(note_data, user_id_1, note_id)
else:
logger.error('请求失败,跳过')
current_cookie_index += 1
if has_more == False:
logger.info(f'用户{user_id}的笔记已经爬完')
break
else:
logger.info('------------------------------------')
logger.info('请求失败,切换下一个 cookies')
logger.info('------------------------------------')
current_cookie_index += 1
person_index += 1
logger.info("所有用户数据处理完毕")
if __name__ == '__main__':
file_path = '博主id.xlsx' # 存放博主用户ID的Excel
cookies_list = [
]
is_download = True
file_name = '博主帖子信息'
main(file_path, file_name, cookies_list, is_download)
结果展示
四、可能遇到的问题与应对
-
频繁出现“频率请求限制”
- 需要搭建代理池,或降低请求频率;
- 或者在同一次爬取里轮换使用不同 Cookies 和不同 IP。
- 只做参考学习使用,切勿大量爬取。
-
CSV 文件乱码
- 写入 CSV 时,可以使用
encoding='utf-8-sig'
或者gb18030
,看你本机环境需求。
- 写入 CSV 时,可以使用
-
爬取速度过慢 / 数据量大
- 可以做多线程或多进程加速;
- 需要在多线程场景下,注意 Cookies 和频率控制策略,避免过度访问被封。
- 只用作参考学习使用,切勿大量爬取。
五、总结
通过上文的示例,我们了解了如何借助 Python + ExecJS 来模拟小红书官方 JS 的请求签名过程,从而实现对用户笔记列表和笔记详情的批量爬取。重点在于:
- 逆向获取 JS 并在 Python 中执行:缺少这一步,就无法正确生成
x-s
、x-t
,进而无法正常拿到小红书的数据。 - Cookies 管理和频率控制:当访问过于频繁时,需要切换 Cookies 并加入适度的延时,以规避被封,只用作参考学习使用,切勿大量爬取。
- 数据解析与持久化:拿到接口返回的数据后,要有完善的解析逻辑,将结果按需存储到 CSV / Excel;若涉及下载图片,也要逐条保存到本地目录。
声明:本示例仅用于学习与技术交流,请勿将其用于任何商业或非法用途。若有侵权,请及时与笔者联系。
如果这篇文章对你有帮助,欢迎点赞、收藏或关注!
如有其他疑问或想进一步探讨的,欢迎留言或私信交流。
参考与致谢
- Python 官方文档
- requests 库 GitHub
- execjs 库 PyPI
- 以及在逆向分析小红书 Web 端 JS 过程中提供帮助的社区大佬们。
END