最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python开源搜索引擎_python语言磁力搜索引擎源码公开,基于DHT协议,十二分有技术含量的技术博客...

运维笔记admin11浏览0评论

之前我在写百度网盘爬虫,百度图片爬虫的时候答应网友说,抽时间要把ok搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个linux系统,具备公网条件,然后运行:python startCrawler.py

有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址1 下载地址2

代码如下:#!/usr/bin/env python

# encoding: utf-8

"""

author:haoning

create time:2015.8.1

"""

import hashlib

import os

import time

import datetime

import traceback

import sys

import random

import json

import socket

import threading

from hashlib import sha1 #进行hash加密

from random import randint

from struct import unpack

from socket import inet_ntoa

from threading import Timer, Thread

from time import sleep

from collections import deque

from Queue import Queue

import MySQLdb as mdb  #数据库连接器

import metautils

import downloadTorrent

from bencode import bencode, bdecode

import pygeoip

DB_HOST = '127.0.0.1'

DB_USER = 'root'

DB_PASS = 'root'

BOOTSTRAP_NODES = (

("67.215.246.10", 6881),

("82.221.103.244", 6881),

("23.21.224.150", 6881)

)

RATE = 1 #调控速率

TID_LENGTH = 2

RE_JOIN_DHT_INTERVAL = 3

TOKEN_LENGTH = 2

INFO_HASH_LEN = 500000 #50w数据很小,限制内存不至于消耗太大

CACHE_LEN = 100 #更新数据库缓存

WAIT_DOWNLOAD = 80

geoip = pygeoip.GeoIP('GeoIP.dat')

def is_ip_allowed(ip):

country = geoip.country_code_by_addr(ip)

if country in ('CN','TW','JP','HK', 'KR'):

return True

return False

def entropy(length):

return "".join(chr(randint(0, 255)) for _ in xrange(length))

def random_id():

h = sha1()

h.update(entropy(20))

return h.digest()

def decode_nodes(nodes):

n = []

length = len(nodes)

if (length % 26) != 0:

return n

for i in range(0, length, 26):

nid = nodes[i:i+20]

ip = inet_ntoa(nodes[i+20:i+24])

port = unpack("!H", nodes[i+24:i+26])[0]

n.append((nid, ip, port))

return n

def timer(t, f):

Timer(t, f).start()

def get_neighbor(target, nid, end=10):

return target[:end]+nid[end:]

class KNode(object):

def __init__(self, nid, ip, port):

self.nid = nid

self.ip = ip

self.port = port

class DHTClient(Thread):

def __init__(self, max_node_qsize):

Thread.__init__(self)

self.setDaemon(True)

self.max_node_qsize = max_node_qsize

self.nid = random_id()

self.nodes = deque(maxlen=max_node_qsize)

def send_krpc(self, msg, address):

try:

self.ufd.sendto(bencode(msg), address)

except Exception:

pass

def send_find_node(self, address, nid=None):

nid = get_neighbor(nid, self.nid) if nid else self.nid

tid = entropy(TID_LENGTH)

msg = {

"t": tid,

"y": "q",

"q": "find_node",

"a": {

"id": nid,

"target": random_id()

}

}

self.send_krpc(msg, address)

def join_DHT(self):

for address in BOOTSTRAP_NODES:

self.send_find_node(address)

def re_join_DHT(self):

if len(self.nodes) == 0:

self.join_DHT()

timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)

def auto_send_find_node(self):

wait = 1.0 / self.max_node_qsize

while True:

try:

node = self.nodes.popleft()

self.send_find_node((node.ip, node.port), node.nid)

except IndexError:

pass

try:

sleep(wait)

except KeyboardInterrupt:

os._exit(0)

def process_find_node_response(self, msg, address):

nodes = decode_nodes(msg["r"]["nodes"])

for node in nodes:

(nid, ip, port) = node

if len(nid) != 20: continue

if ip == self.bind_ip: continue

n = KNode(nid, ip, port)

self.nodes.append(n)

class DHTServer(DHTClient): #获得info_hash

def __init__(self, master, bind_ip, bind_port, max_node_qsize):

DHTClient.__init__(self, max_node_qsize)

self.master = master

self.bind_ip = bind_ip

self.bind_port = bind_port

self.speed=0

self.process_request_actions = {

"get_peers": self.on_get_peers_request,

"announce_peer": self.on_announce_peer_request,

}

self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

self.ufd.bind((self.bind_ip, self.bind_port))

timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)

def run(self):

self.re_join_DHT()

while True:

try:

(data, address) = self.ufd.recvfrom(65536)

msg = bdecode(data)

self.on_message(msg, address)

except Exception:

pass

def on_message(self, msg, address):

global RATE #设为全局量

try:

if msg["y"] == "r":

if msg["r"].has_key("nodes"):

self.process_find_node_response(msg, address) #发现节点

elif msg["y"] == "q":

try:

self.speed+=1

if self.speed % 10000 ==0:

RATE=random.randint(1,3)

if RATE==2:

RATE=1

if RATE==3:

RATE=10

if self.speed>100000:

self.speed=0

if self.speed % RATE==0: #数据过多,占用cpu太多,划分限速,1,1,10

self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash

#self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash

except KeyError:

self.play_dead(msg, address)

except KeyError:

pass

def on_get_peers_request(self, msg, address):

try:

infohash = msg["a"]["info_hash"]

tid = msg["t"]

nid = msg["a"]["id"]

token = infohash[:TOKEN_LENGTH]

msg = {

"t": tid,

"y": "r",

"r": {

"id": get_neighbor(infohash, self.nid),

"nodes": "",

"token": token

}

}

self.master.log(infohash, address)

self.send_krpc(msg, address)

except KeyError:

pass

def on_announce_peer_request(self, msg, address):

try:

infohash = msg["a"]["info_hash"]

token = msg["a"]["token"]

nid = msg["a"]["id"]

tid = msg["t"]

if infohash[:TOKEN_LENGTH] == token:

if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:

port = address[1]

else:

port = msg["a"]["port"]

self.master.log_announce(infohash, (address[0], port))

except Exception:

print 'error'

pass

finally:

self.ok(msg, address)

def play_dead(self, msg, address):

try:

tid = msg["t"]

msg = {

"t": tid,

"y": "e",

"e": [202, "Server Error"]

}

self.send_krpc(msg, address)

except KeyError:

pass

def ok(self, msg, address):

try:

tid = msg["t"]

nid = msg["a"]["id"]

msg = {

"t": tid,

"y": "r",

"r": {

"id": get_neighbor(nid, self.nid)

}

}

self.send_krpc(msg, address)

except KeyError:

pass

class Master(Thread): #解析info_hash

def __init__(self):

Thread.__init__(self)

self.setDaemon(True)

self.queue = Queue()

self.cache = Queue()

self.count=0

self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?

self.waitDownload = Queue()

self.metadata_queue = Queue()

self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')

self.dbconn.autocommit(False)

self.dbcurr = self.dbconn.cursor()

self.dbcurr.execute('SET NAMES utf8')

self.visited = set()

def lock(self): #加锁

self.mutex.acquire()

def unlock(self): #解锁

self.mutex.release()

def work(self,item):

print "start thread",item

while True:

self.prepare_download_metadata()

self.lock()

self.download_metadata()

self.unlock()

self.lock()

self.got_torrent()

self.unlock()

def start_work(self,max):

for item in xrange(max):

t = threading.Thread(target=self.work, args=(item,))

t.setDaemon(True)

t.start()

#入队的种子效率更高

def log_announce(self, binhash, address=None):

if self.queue.qsize() 

if is_ip_allowed(address[0]):

self.queue.put([address, binhash]) #获得info_hash

def log(self, infohash, address=None):

if self.queue.qsize() 

if is_ip_allowed(address[0]):

self.queue.put([address, infohash])

def prepare_download_metadata(self):

if self.queue.qsize() == 0:

sleep(2)

#从queue中获得info_hash用来下载

address, binhash= self.queue.get()

if binhash in self.visited:

return

if len(self.visited) > 100000: #大于100000重置队列,认为已经访问过了

self.visited = set()

self.visited.add(binhash)

#跟新已经访问过的info_hash

info_hash = binhash.encode('hex')

utcnow = datetime.datetime.utcnow()

self.cache.put((address,binhash,utcnow)) #装入缓存队列

def download_metadata(self):

if self.cache.qsize() > CACHE_LEN/2: #出队更新下载

while self.cache.qsize() > 0: #排空队列

address,binhash,utcnow = self.cache.get()

info_hash = binhash.encode('hex')

self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))

y = self.dbcurr.fetchone()

if y:

# 更新最近发现时间,请求数

self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))

else:

self.waitDownload.put((address, binhash))

self.dbconnmit()

if self.waitDownload.qsize() > WAIT_DOWNLOAD:

while self.waitDownload.qsize() > 0:

address,binhash = self.waitDownload.get()

t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))

t.setDaemon(True)

t.start()

def decode(self, s):

if type(s) is list:

s = ';'.join(s)

u = s

for x in (self.encoding, 'utf8', 'gbk', 'big5'):

try:

u = s.decode(x)

return u

except:

pass

return s.decode(self.encoding, 'ignore')

def decode_utf8(self, d, i):

if i+'.utf-8' in d:

return d[i+'.utf-8'].decode('utf8')

return self.decode(d[i])

def parse_metadata(self, data): #解析种子

info = {}

self.encoding = 'utf8'

try:

torrent = bdecode(data) #编码后解析

if not torrent.get('name'):

return None

except:

return None

detail = torrent

info['name'] = self.decode_utf8(detail, 'name')

if 'files' in detail:

info['files'] = []

for x in detail['files']:

if 'path.utf-8' in x:

v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}

else:

v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}

if 'filehash' in x:

v['filehash'] = x['filehash'].encode('hex')

info['files'].append(v)

info['length'] = sum([x['length'] for x in info['files']])

else:

info['length'] = detail['length']

info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()

return info

def got_torrent(self):

if self.metadata_queue.qsize() == 0:

return

binhash, address, data,start_time = self.metadata_queue.get()

if not data:

return

try:

info = self.parse_metadata(data)

if not info:

return

except:

traceback.print_exc()

return

temp = time.time()

x = time.localtime(float(temp))

utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now

info_hash = binhash.encode('hex') #磁力

info['info_hash'] = info_hash

# need to build tags

info['tagged'] = False

info['classified'] = False

info['requests'] = 1

info['last_seen'] = utcnow

info['create_time'] = utcnow

info['source_ip'] = address[0]

if info.get('files'):

files = [z for z in info['files'] if not z['path'].startswith('_')]

if not files:

files = info['files']

else:

files = [{'path': info['name'], 'length': info['length']}]

files.sort(key=lambda z:z['length'], reverse=True)

bigfname = files[0]['path']

info['extension'] = metautils.get_extension(bigfname).lower()

info['category'] = metautils.get_category(info['extension'])

try:

try:

print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]

except:

print '\n', 'Saved', info['info_hash']

ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' +

'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',

(info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],

info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))

if self.count %50 ==0:

self.dbconnmit()

if self.count>100000:

self.count=0

except:

print self.name, 'save error', self.name, info

traceback.print_exc()

return

if __name__ == "__main__":

#启动客户端

master = Master()

master.start_work(150)

#启动服务器

dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)

dht.start()

dht.auto_send_find_node()

注意,上面的代码有一段代码需要下载种子,所以下面的这段代码十分重要:#!/usr/bin/env python

# encoding: utf-8

"""

author:haoning

create time:2015.8.1

"""

from hashlib import sha1

import math

from socket import inet_ntoa

import socket

from struct import pack, unpack

from threading import Timer, Thread

from time import sleep, time

from bencode import bencode, bdecode

from startCrawler import entropy

BT_PROTOCOL = "BitTorrent protocol"

BT_MSG_ID = 20

EXT_HANDSHAKE_ID = 0

def random_id():

hash = sha1()

hash.update(entropy(20))

return hash.digest()

def send_packet(the_socket, msg):

the_socket.send(msg)

def send_message(the_socket, msg):

msg_len = pack(">I", len(msg))

send_packet(the_socket, msg_len + msg)

def send_handshake(the_socket, infohash):

bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL

ext_bytes = "\x00\x00\x00\x00\x00\x10\x00\x00"

peer_id = random_id()

packet = bt_header + ext_bytes + infohash + peer_id

send_packet(the_socket, packet)

def check_handshake(packet, self_infohash):

try:

bt_header_len, packet = ord(packet[:1]), packet[1:]

if bt_header_len != len(BT_PROTOCOL):

return False

except TypeError:

return False

bt_header, packet = packet[:bt_header_len], packet[bt_header_len:]

if bt_header != BT_PROTOCOL:

return False

packet = packet[8:]

infohash = packet[:20]

if infohash != self_infohash:

return False

return True

def send_ext_handshake(the_socket):

msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}})

send_message(the_socket, msg)

def request_metadata(the_socket, ut_metadata, piece):

"""bep_0009"""

msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece})

send_message(the_socket, msg)

def get_ut_metadata(data):

ut_metadata = "_metadata"

index = data.index(ut_metadata)+len(ut_metadata) + 1

return int(data[index])

def get_metadata_size(data):

metadata_size = "metadata_size"

start = data.index(metadata_size) + len(metadata_size) + 1

data = data[start:]

return int(data[:data.index("e")])

def recvall(the_socket, timeout=5):

the_socket.setblocking(0)

total_data = []

data = ""

begin = time()

while True:

sleep(0.05)

if total_data and time()-begin > timeout:

break

elif time()-begin > timeout*2:

break

try:

data = the_socket.recv(1024)

if data:

total_data.append(data)

begin = time()

except Exception:

pass

return "".join(total_data)

def download_metadata(address, infohash, metadata_queue, timeout=5):

metadata = None

start_time = time()

the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:

the_socket.settimeout(timeout)

the_socket.connect(address)

# handshake

send_handshake(the_socket, infohash)

packet = the_socket.recv(4096)

# handshake error

if not check_handshake(packet, infohash):

return

# ext handshake

send_ext_handshake(the_socket)

packet = the_socket.recv(4096)

# get ut_metadata and metadata_size

ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet)

# request each piece of metadata

metadata = []

for piece in range(int(math.ceil(metadata_size/(16.0*1024)))): #piece是个控制块,根据控制块下载数据

request_metadata(the_socket, ut_metadata, piece)

packet = recvall(the_socket, timeout) #the_socket.recv(1024*17)

metadata.append(packet[packet.index("ee")+2:])

metadata = "".join(metadata)

except socket.timeout:

pass

except Exception, e:

pass

finally:

#print "metadata= %s" %(metadata)

the_socket.close() #确保没回都关闭socket

if metadata != None: #只让不空的种子入?

metadata_queue.put((infohash, address, metadata,start_time))

其实下载种子还有一种方式就是借助libtorrent,但这个太耗费cpu了,所以我一般不用他,如下:

#coding: utf8

import threading

import traceback

import random

import time

import os

import socket

import libtorrent as lt

threading.stack_size(200*1024)

socket.setdefaulttimeout(30)

def fetch_torrent(session, ih, timeout):

name = ih.upper()

url = 'magnet:?xt=urn:btih:%s' % (name,)

data = ''

params = {

'save_path': '/tmp/downloads/',

'storage_mode': lt.storage_mode_t(2),

'paused': False,

'auto_managed': False,

'duplicate_is_error': True}

try:

handle = lt.add_magnet_uri(session, url, params)

except:

return None

status = session.status()

handle.set_sequential_download(1)

meta = None

down_time = time.time()

down_path = None

for i in xrange(0, timeout):

if handle.has_metadata():

info = handle.get_torrent_info()

down_path = '/tmp/downloads/%s' % info.name()

#print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download

meta = info.metadata()

break

time.sleep(1)

if down_path and os.path.exists(down_path):

os.system('rm -rf "%s"' % down_path)

session.remove_torrent(handle)

return meta

def download_metadata(address, binhash, metadata_queue, timeout=20):

metadata = None

start_time = time.time()

try:

session = lt.session()

r = random.randrange(10000, 50000)

session.listen_on(r, r+10)

session.add_dht_router('router.bittorrent',6881)

session.add_dht_router('router.utorrent',6881)

session.add_dht_router('dht.transmission',6881)

session.add_dht_router('127.0.0.1',6881)

session.start_dht()

metadata = fetch_torrent(session, binhash.encode('hex'), timeout)

session = None

except:

traceback.print_exc()

finally:

metadata_queue.put((binhash, address, metadata,start_time))

这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个qq群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹,qq群号:512245829

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论