@ -1 +1,27 @@ | |||||
from pysitemap.crawler import Crawler | |||||
import asyncio | |||||
import signal | |||||
from pysitemap.base_crawler import Crawler | |||||
def crawler(root_url, out_file, out_format='xml', maxtasks=100): | |||||
""" | |||||
run crowler | |||||
:param root_url: Site root url | |||||
:param out_file: path to the out file | |||||
:param out_format: format of out file [xml, txt] | |||||
:param maxtasks: max count of tasks | |||||
:return: | |||||
""" | |||||
loop = asyncio.get_event_loop() | |||||
c = Crawler(root_url, out_file=out_file, out_format=out_format, maxtasks=maxtasks) | |||||
loop.run_until_complete(c.run()) | |||||
try: | |||||
loop.add_signal_handler(signal.SIGINT, loop.stop) | |||||
except RuntimeError: | |||||
pass | |||||
print('todo:', len(c.todo)) | |||||
print('busy:', len(c.busy)) | |||||
print('done:', len(c.done), '; ok:', sum(c.done.values())) | |||||
print('tasks:', len(c.tasks)) |
@ -0,0 +1,75 @@ | |||||
import asyncio | |||||
import re | |||||
import urllib.parse | |||||
from pysitemap.format_processors.xml import XMLWriter | |||||
import aiohttp | |||||
class Crawler: | |||||
format_processors = { | |||||
'xml': XMLWriter | |||||
} | |||||
def __init__(self, rooturl, out_file, out_format='xml', maxtasks=100): | |||||
self.rooturl = rooturl | |||||
self.todo = set() | |||||
self.busy = set() | |||||
self.done = {} | |||||
self.tasks = set() | |||||
self.sem = asyncio.Semaphore(maxtasks) | |||||
# connector stores cookies between requests and uses connection pool | |||||
self.session = aiohttp.ClientSession() | |||||
self.writer = self.format_processors.get(out_format)(out_file) | |||||
async def run(self): | |||||
t = asyncio.ensure_future(self.addurls([(self.rooturl, '')])) | |||||
await asyncio.sleep(1) | |||||
while self.busy: | |||||
await asyncio.sleep(1) | |||||
await t | |||||
await self.session.close() | |||||
await self.writer.write([key for key, value in self.done.items() if value]) | |||||
async def addurls(self, urls): | |||||
for url, parenturl in urls: | |||||
url = urllib.parse.urljoin(parenturl, url) | |||||
url, frag = urllib.parse.urldefrag(url) | |||||
if (url.startswith(self.rooturl) and | |||||
url not in self.busy and | |||||
url not in self.done and | |||||
url not in self.todo): | |||||
self.todo.add(url) | |||||
await self.sem.acquire() | |||||
task = asyncio.ensure_future(self.process(url)) | |||||
task.add_done_callback(lambda t: self.sem.release()) | |||||
task.add_done_callback(self.tasks.remove) | |||||
self.tasks.add(task) | |||||
async def process(self, url): | |||||
print('processing:', url) | |||||
self.todo.remove(url) | |||||
self.busy.add(url) | |||||
try: | |||||
resp = await self.session.get(url) | |||||
except Exception as exc: | |||||
print('...', url, 'has error', repr(str(exc))) | |||||
self.done[url] = False | |||||
else: | |||||
if (resp.status == 200 and | |||||
('text/html' in resp.headers.get('content-type'))): | |||||
data = (await resp.read()).decode('utf-8', 'replace') | |||||
urls = re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', data) | |||||
asyncio.Task(self.addurls([(u, url) for u in urls])) | |||||
resp.close() | |||||
self.done[url] = True | |||||
self.busy.remove(url) | |||||
print(len(self.done), 'completed tasks,', len(self.tasks), | |||||
'still pending, todo', len(self.todo)) | |||||
@ -1,68 +0,0 @@ | |||||
#!/usr/bin/env python3 | |||||
import time | |||||
import asyncio | |||||
from aiohttp import ClientSession | |||||
from parsel import Selector | |||||
from urllib.parse import urlparse, urlunparse | |||||
class Crawler(object): | |||||
def __init__(self, url, sleep_time=.5): | |||||
self.urls = [url] | |||||
scheme, netloc, path, params, query, fragment = urlparse(url) | |||||
if not netloc: | |||||
netloc, path = path, netloc | |||||
url = urlunparse((scheme, netloc, "", params, "", fragment)) | |||||
self.base_url = url | |||||
self.sleep_time = float(sleep_time) | |||||
async def fetch(self, url, session): | |||||
async with session.get(url) as response: | |||||
await asyncio.sleep(self.sleep_time) | |||||
return response.content | |||||
async def bound_fetch(self, sem, url, session): | |||||
# Getter function with semaphore. | |||||
async with sem: | |||||
await self.fetch(url, session) | |||||
def norm_link(self, url:str): | |||||
if url.startswith(self.base_url): | |||||
return url | |||||
elif url.startswith('//'): | |||||
return "{scheme}{url}".format( | |||||
scheme=self.base_url[:self.base_url.find(":")], | |||||
url=url | |||||
) | |||||
elif url.startswith("/"): | |||||
return self.base_url + url | |||||
return None | |||||
async def parse(self, content): | |||||
sel = Selector(content) | |||||
links = sel.xpath('//a/@href').getall() | |||||
normalized_links = [] | |||||
for link in links: | |||||
link = self.norm_link(link) | |||||
if link: | |||||
normalized_links.append(link) | |||||
self.urls.extend(normalized_links) | |||||
async def run(self): | |||||
tasks = [] | |||||
# create instance of Semaphore | |||||
sem = asyncio.Semaphore(20) | |||||
# Create client session that will ensure we dont open new connection | |||||
# per each request. | |||||
async with ClientSession() as session: | |||||
for url in self.urls: | |||||
# pass Semaphore and session to every GET request | |||||
task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) | |||||
tasks.append(task) | |||||
responses = asyncio.gather(*tasks) | |||||
await responses | |||||
@ -0,0 +1,26 @@ | |||||
import asyncio | |||||
from aiofile import AIOFile, Reader, Writer | |||||
import logging | |||||
class XMLWriter(): | |||||
def __init__(self, filename: str): | |||||
self.filename = filename | |||||
async def write(self, urls): | |||||
async with AIOFile(self.filename, 'w') as aiodf: | |||||
writer = Writer(aiodf) | |||||
await writer('<?xml version="1.0" encoding="utf-8"?>\n') | |||||
await writer( | |||||
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"' | |||||
' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"' | |||||
' xsi:schemaLocation="http://www.sitemaps.org/schemas/sitemap/0.9 http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">\n') | |||||
await aiodf.fsync() | |||||
for url in urls: | |||||
await writer('<url><loc>{}</loc></url>\n'.format(url)) | |||||
await aiodf.fsync() | |||||
await writer('</urlset>') | |||||
await aiodf.fsync() | |||||
@ -1,20 +1,15 @@ | |||||
from pysitemap import Crawler | |||||
import asyncio | |||||
""" | |||||
Example script | |||||
Uses gevent to implement multiprocessing if Gevent installed | |||||
To install gevent: | |||||
$ pip install gevent | |||||
""" | |||||
import sys | |||||
import logging | |||||
from pysitemap import crawler | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
url = 'http://www.stroivopros.ru/' # url from to crawl | |||||
logfile = 'errlog.log' # path to logfile | |||||
oformat = 'xml' # output format | |||||
outputfile = 'sitemap.xml' # path to output file | |||||
loop = asyncio.get_event_loop() | |||||
crawler: Crawler = Crawler(url=url) | |||||
future = asyncio.ensure_future(crawler.run()) | |||||
loop.run_until_complete(future) | |||||
if '--iocp' in sys.argv: | |||||
from asyncio import events, windows_events | |||||
sys.argv.remove('--iocp') | |||||
logging.info('using iocp') | |||||
el = windows_events.ProactorEventLoop() | |||||
events.set_event_loop(el) | |||||
# root_url = sys.argv[1] | |||||
root_url = 'https://www.haikson.com' | |||||
crawler(root_url, out_file='sitemap.xml') |