@ -1 +1,27 @@ | |||||
from .crawler import Crawler | |||||
import asyncio | |||||
import signal | |||||
from pysitemap.base_crawler import Crawler | |||||
def crawler(root_url, out_file, out_format='xml', maxtasks=100): | |||||
""" | |||||
run crowler | |||||
:param root_url: Site root url | |||||
:param out_file: path to the out file | |||||
:param out_format: format of out file [xml, txt] | |||||
:param maxtasks: max count of tasks | |||||
:return: | |||||
""" | |||||
loop = asyncio.get_event_loop() | |||||
c = Crawler(root_url, out_file=out_file, out_format=out_format, maxtasks=maxtasks) | |||||
loop.run_until_complete(c.run()) | |||||
try: | |||||
loop.add_signal_handler(signal.SIGINT, loop.stop) | |||||
except RuntimeError: | |||||
pass | |||||
print('todo:', len(c.todo)) | |||||
print('busy:', len(c.busy)) | |||||
print('done:', len(c.done), '; ok:', sum(c.done.values())) | |||||
print('tasks:', len(c.tasks)) |
@ -0,0 +1,77 @@ | |||||
import asyncio | |||||
import re | |||||
import urllib.parse | |||||
from pysitemap.format_processors.xml import XMLWriter | |||||
from pysitemap.format_processors.text import TextWriter | |||||
import aiohttp | |||||
class Crawler: | |||||
format_processors = { | |||||
'xml': XMLWriter, | |||||
'txt': TextWriter | |||||
} | |||||
def __init__(self, rooturl, out_file, out_format='xml', maxtasks=100): | |||||
self.rooturl = rooturl | |||||
self.todo = set() | |||||
self.busy = set() | |||||
self.done = {} | |||||
self.tasks = set() | |||||
self.sem = asyncio.Semaphore(maxtasks) | |||||
# connector stores cookies between requests and uses connection pool | |||||
self.session = aiohttp.ClientSession() | |||||
self.writer = self.format_processors.get(out_format)(out_file) | |||||
async def run(self): | |||||
t = asyncio.ensure_future(self.addurls([(self.rooturl, '')])) | |||||
await asyncio.sleep(1) | |||||
while self.busy: | |||||
await asyncio.sleep(1) | |||||
await t | |||||
await self.session.close() | |||||
await self.writer.write([key for key, value in self.done.items() if value]) | |||||
async def addurls(self, urls): | |||||
for url, parenturl in urls: | |||||
url = urllib.parse.urljoin(parenturl, url) | |||||
url, frag = urllib.parse.urldefrag(url) | |||||
if (url.startswith(self.rooturl) and | |||||
url not in self.busy and | |||||
url not in self.done and | |||||
url not in self.todo): | |||||
self.todo.add(url) | |||||
await self.sem.acquire() | |||||
task = asyncio.ensure_future(self.process(url)) | |||||
task.add_done_callback(lambda t: self.sem.release()) | |||||
task.add_done_callback(self.tasks.remove) | |||||
self.tasks.add(task) | |||||
async def process(self, url): | |||||
print('processing:', url) | |||||
self.todo.remove(url) | |||||
self.busy.add(url) | |||||
try: | |||||
resp = await self.session.get(url) | |||||
except Exception as exc: | |||||
print('...', url, 'has error', repr(str(exc))) | |||||
self.done[url] = False | |||||
else: | |||||
if (resp.status == 200 and | |||||
('text/html' in resp.headers.get('content-type'))): | |||||
data = (await resp.read()).decode('utf-8', 'replace') | |||||
urls = re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', data) | |||||
asyncio.Task(self.addurls([(u, url) for u in urls])) | |||||
resp.close() | |||||
self.done[url] = True | |||||
self.busy.remove(url) | |||||
print(len(self.done), 'completed tasks,', len(self.tasks), | |||||
'still pending, todo', len(self.todo)) | |||||
@ -1,162 +0,0 @@ | |||||
# -*- coding: utf-8 -*- | |||||
import __future__ | |||||
import sys | |||||
if sys.version_info.major == 2: | |||||
import urlparse | |||||
else: | |||||
from urllib import parse as urlparse | |||||
import requests | |||||
from lxml import html | |||||
import re | |||||
import time | |||||
try: | |||||
import sys | |||||
if 'threading' in sys.modules: | |||||
del sys.modules['threading'] | |||||
print('threading module loaded before patching!') | |||||
print('threading module deleted from sys.modules!\n') | |||||
from gevent import monkey, pool | |||||
monkey.patch_all() | |||||
gevent_installed = True | |||||
except: | |||||
print("Gevent is not installed. Parsing process will be slower.") | |||||
gevent_installed = False | |||||
class Crawler: | |||||
def __init__(self, url, outputfile='sitemap.xml', logfile='error.log', oformat='xml'): | |||||
self.url = url | |||||
self.logfile = open(logfile, 'a') | |||||
self.oformat = oformat | |||||
self.outputfile = outputfile | |||||
# create lists for urls in queue and visited urls | |||||
self.urls = set([url]) | |||||
self.visited = set([url]) | |||||
self.exts = ['htm', 'php'] | |||||
self.allowed_regex = '\.((?!htm)(?!php)\w+)$' | |||||
self.errors = {'404': []} | |||||
def set_exts(self, exts): | |||||
self.exts = exts | |||||
def allow_regex(self, regex=None): | |||||
if regex is not None: | |||||
self.allowed_regex = regex | |||||
else: | |||||
allowed_regex = '' | |||||
for ext in self.exts: | |||||
allowed_regex += '(!{})'.format(ext) | |||||
self.allowed_regex = '\.({}\w+)$'.format(allowed_regex) | |||||
def crawl(self, echo=False, pool_size=1): | |||||
# sys.stdout.write('echo attribute deprecated and will be removed in future') | |||||
self.echo = echo | |||||
self.regex = re.compile(self.allowed_regex) | |||||
print('Parsing pages') | |||||
if gevent_installed and pool_size >= 1: | |||||
self.pool = pool.Pool(pool_size) | |||||
self.pool.spawn(self.parse_gevent) | |||||
self.pool.join() | |||||
else: | |||||
self.pool = [None,] # fixing n_pool exception in self.parse with poolsize > 1 and gevent_installed == False | |||||
while len(self.urls) > 0: | |||||
self.parse() | |||||
if self.oformat == 'xml': | |||||
self.write_xml() | |||||
elif self.oformat == 'txt': | |||||
self.write_txt() | |||||
with open('errors.txt', 'w') as err_file: | |||||
for key, val in self.errors.items(): | |||||
err_file.write(u'\n\nError {}\n\n'.format(key)) | |||||
err_file.write(u'\n'.join(set(val))) | |||||
def parse_gevent(self): | |||||
self.parse() | |||||
while len(self.urls) > 0 and not self.pool.full(): | |||||
self.pool.spawn(self.parse_gevent) | |||||
def parse(self): | |||||
if self.echo: | |||||
n_visited, n_urls, n_pool = len(self.visited), len(self.urls), len(self.pool) | |||||
status = ( | |||||
'{} pages parsed :: {} pages in the queue'.format(n_visited, n_urls), | |||||
'{} pages parsed :: {} parsing processes :: {} pages in the queue'.format(n_visited, n_pool, n_urls) | |||||
) | |||||
print(status[int(gevent_installed)]) | |||||
if not self.urls: | |||||
return | |||||
else: | |||||
url = self.urls.pop() | |||||
try: | |||||
response = requests.get(url) | |||||
# if status code is not 404, then add url in seld.errors dictionary | |||||
if response.status_code != 200: | |||||
if self.errors.get(str(response.status_code), False): | |||||
self.errors[str(response.status_code)].extend([url]) | |||||
else: | |||||
self.errors.update({str(response.status_code): [url]}) | |||||
self.errlog("Error {} at url {}".format(response.status_code, url)) | |||||
return | |||||
try: | |||||
tree = html.fromstring(response.text) | |||||
except ValueError as e: | |||||
self.errlog(repr(e)) | |||||
tree = html.fromstring(response.content) | |||||
for link_tag in tree.findall('.//a'): | |||||
link = link_tag.attrib.get('href', '') | |||||
newurl = urlparse.urljoin(self.url, link) | |||||
# print(newurl) | |||||
if self.is_valid(newurl): | |||||
self.visited.update([newurl]) | |||||
self.urls.update([newurl]) | |||||
except Exception as e: | |||||
self.errlog(repr(e)) | |||||
def is_valid(self, url): | |||||
oldurl = url | |||||
if '#' in url: | |||||
url = url[:url.find('#')] | |||||
if url in self.visited or oldurl in self.visited: | |||||
return False | |||||
if self.url not in url: | |||||
return False | |||||
if re.search(self.regex, url): | |||||
return False | |||||
return True | |||||
def errlog(self, msg): | |||||
self.logfile.write(msg) | |||||
self.logfile.write('\n') | |||||
def write_xml(self): | |||||
of = open(self.outputfile, 'w') | |||||
of.write('<?xml version="1.0" encoding="utf-8"?>\n') | |||||
of.write('<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.sitemaps.org/schemas/sitemap/0.9 http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">\n') | |||||
url_str = '<url><loc>{}</loc></url>\n' | |||||
while self.visited: | |||||
of.write(url_str.format(self.visited.pop())) | |||||
of.write('</urlset>') | |||||
of.close() | |||||
def write_txt(self): | |||||
of = open(self.outputfile, 'w') | |||||
url_str = u'{}\n' | |||||
while self.visited: | |||||
of.write(url_str.format(self.visited.pop())) | |||||
of.close() | |||||
def show_progress(self, count, total, status=''): | |||||
bar_len = 60 | |||||
filled_len = int(round(bar_len * count / float(total))) | |||||
percents = round(100.0 * count / float(total), 1) | |||||
bar = '=' * filled_len + '-' * (bar_len - filled_len) | |||||
sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percents, '%', status)) | |||||
sys.stdout.flush() # As suggested by Rom Ruben (see: http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console/27871113#comment50529068_27871113) | |||||
time.sleep(0.5) |
@ -0,0 +1,37 @@ | |||||
from sqlalchemy import create_engine | |||||
from sqlalchemy.ext.declarative import declarative_base | |||||
from sqlalchemy.orm import sessionmaker, scoped_session | |||||
from sqlalchemy.sql import ClauseElement | |||||
DB_URI = 'sqlite:///stuff.db' | |||||
db_endine = create_engine(DB_URI) | |||||
session = scoped_session( | |||||
sessionmaker( | |||||
autocommit=False, | |||||
autoflush=False, | |||||
bind=db_endine | |||||
) | |||||
) | |||||
Model = declarative_base() | |||||
def get_or_create(session, model, defaults=None, **kwargs): | |||||
instance = session.query(model).filter_by(**kwargs).first() | |||||
if instance: | |||||
return instance, False | |||||
else: | |||||
params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement)) | |||||
params.update(defaults or {}) | |||||
instance = model(**params) | |||||
session.add(instance) | |||||
session.commit() | |||||
return instance, True | |||||
@ -0,0 +1,18 @@ | |||||
import asyncio | |||||
from aiofile import AIOFile, Reader, Writer | |||||
import logging | |||||
class TextWriter(): | |||||
def __init__(self, filename: str): | |||||
self.filename = filename | |||||
async def write(self, urls): | |||||
async with AIOFile(self.filename, 'w') as aiodf: | |||||
writer = Writer(aiodf) | |||||
for url in urls: | |||||
await writer("{}\n".format(url)) | |||||
await aiodf.fsync() | |||||
@ -0,0 +1,26 @@ | |||||
import asyncio | |||||
from aiofile import AIOFile, Reader, Writer | |||||
import logging | |||||
class XMLWriter(): | |||||
def __init__(self, filename: str): | |||||
self.filename = filename | |||||
async def write(self, urls): | |||||
async with AIOFile(self.filename, 'w') as aiodf: | |||||
writer = Writer(aiodf) | |||||
await writer('<?xml version="1.0" encoding="utf-8"?>\n') | |||||
await writer( | |||||
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"' | |||||
' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"' | |||||
' xsi:schemaLocation="http://www.sitemaps.org/schemas/sitemap/0.9 http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">\n') | |||||
await aiodf.fsync() | |||||
for url in urls: | |||||
await writer('<url><loc>{}</loc></url>\n'.format(url)) | |||||
await aiodf.fsync() | |||||
await writer('</urlset>') | |||||
await aiodf.fsync() | |||||
@ -0,0 +1,79 @@ | |||||
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, Table, exists | |||||
from sqlalchemy.orm import relationship | |||||
import hashlib | |||||
from db import Model, db_endine, session | |||||
import uuid | |||||
from validators import domain as domain_validator | |||||
groups_domains = Table( | |||||
'groups_domains', | |||||
Model.metadata, | |||||
Column('domain_id', Integer, ForeignKey('domain_groups.id')), | |||||
Column('group_id', Integer, ForeignKey('domains.id')) | |||||
) | |||||
class DomainGroup(Model): | |||||
__tablename__ = 'domain_groups' | |||||
id = Column(Integer, primary_key=True) | |||||
name = Column(String(200), nullable=False) | |||||
domains = relationship("Domain", secondary=groups_domains, back_populates="groups") | |||||
def __init__(self, name): | |||||
self.name = name | |||||
def __repr__(self): | |||||
return "<Domain group {}: {}>".format(self.id, self.name) | |||||
@classmethod | |||||
def from_json(cls, data): | |||||
return cls(**data) | |||||
class Domain(Model): | |||||
__tablename__ = 'domains' | |||||
id = Column(Integer, primary_key=True) | |||||
domain = Column(String(200), nullable=False) | |||||
domains = relationship("DomainGroup", secondary=groups_domains, back_populates="domains") | |||||
def __init__(self, domain): | |||||
self.validate_domain(domain) | |||||
self.domain = domain | |||||
def validate_domain(self, domain=None): | |||||
domain = domain or self.domain | |||||
return domain_validator(domain, raise_errors=True) | |||||
def __repr__(self): | |||||
return "<Domain {}: {}>".format(self.id, self.domain) | |||||
@classmethod | |||||
def from_json(cls, data): | |||||
return cls(**data) | |||||
class User(Model): | |||||
__tablename__ = 'users' | |||||
id = Column(Integer, primary_key=True) | |||||
username = Column(String(50), unique=True) | |||||
token = Column(String(250), default='Unknown') | |||||
is_active = Column(Boolean, default=False) | |||||
def __init__(self, username: str): | |||||
self.username = username | |||||
m = hashlib.sha256() | |||||
m.update('{}{}'.format(username, uuid.uuid4()).encode('utf-8')) | |||||
self.token = m.hexdigest() | |||||
@classmethod | |||||
def validate(cls, username, token): | |||||
return session.query(cls).filter(cls.username == username, cls.token == token).count() == 1 | |||||
def __repr__(self): | |||||
return "<User(name='%s', fullname='%s', password='%s')>" % (self.name, self.fullname, self.password) | |||||
Model.metadata.create_all(db_endine) |
@ -0,0 +1,133 @@ | |||||
import inspect | |||||
from collections import OrderedDict | |||||
import json | |||||
from aiohttp.http_exceptions import HttpBadRequest | |||||
from aiohttp.web_exceptions import HTTPMethodNotAllowed | |||||
from aiohttp.web_request import Request | |||||
from aiohttp.web_response import Response | |||||
from aiohttp.web_routedef import UrlDispatcher | |||||
from db import session, get_or_create | |||||
from models import Domain, DomainGroup | |||||
DEFAULT_METHODS = ('GET', 'POST', 'PUT', 'DELETE') | |||||
class RestEndpoint: | |||||
def __init__(self): | |||||
self.methods = {} | |||||
for method_name in DEFAULT_METHODS: | |||||
method = getattr(self, method_name.lower(), None) | |||||
if method: | |||||
self.register_method(method_name, method) | |||||
def register_method(self, method_name, method): | |||||
self.methods[method_name.upper()] = method | |||||
async def dispatch(self, request: Request): | |||||
method = self.methods.get(request.method.upper()) | |||||
if not method: | |||||
raise HTTPMethodNotAllowed('', DEFAULT_METHODS) | |||||
wanted_args = list(inspect.signature(method).parameters.keys()) | |||||
available_args = request.match_info.copy() | |||||
available_args.update({'request': request}) | |||||
unsatisfied_args = set(wanted_args) - set(available_args.keys()) | |||||
if unsatisfied_args: | |||||
# Expected match info that doesn't exist | |||||
raise HttpBadRequest('') | |||||
return await method(**{arg_name: available_args[arg_name] for arg_name in wanted_args}) | |||||
class DomainEndpoint(RestEndpoint): | |||||
def __init__(self, resource): | |||||
super().__init__() | |||||
self.resource = resource | |||||
async def get(self) -> Response: | |||||
data = [] | |||||
domains = session.query(Domain).all() | |||||
for instance in self.resource.collection.values(): | |||||
data.append(self.resource.render(instance)) | |||||
return Response(status=200, body=self.resource.encode({ | |||||
'domains': [ | |||||
{ | |||||
'id': domain.id, 'title': domain.domain, | |||||
'groups': [{'id': group.id, 'name': group.name} for group in domain.groups] | |||||
} for domain in session.query(Domain).all() | |||||
]}), content_type='application/json') | |||||
async def post(self, request): | |||||
data = await request.json() | |||||
domain, _created = get_or_create(Domain, domain=data['domain']) | |||||
return Response(status=200, body=self.resource.encode( | |||||
{'id': domain.id, 'domain': domain.domain, 'created': _created}, | |||||
), content_type='application/json') | |||||
class DomainGroupEndpoint(RestEndpoint): | |||||
def __init__(self, resource): | |||||
super().__init__() | |||||
self.resource = resource | |||||
async def get(self) -> Response: | |||||
return Response(status=200, body=self.resource.encode({ | |||||
'domain_groups': [ | |||||
{ | |||||
'id': group.id, 'name': group.name, | |||||
'domains': [{'id': domain.id, 'name': domain.domain} for domain in group.domains] | |||||
} for group in session.query(DomainGroup).all() | |||||
]}), content_type='application/json') | |||||
async def post(self, request): | |||||
data = await request.json() | |||||
group, _created = get_or_create(session, DomainGroup, name=data['name']) | |||||
domains = [] | |||||
if data.get('domains'): | |||||
for domain_el in data.get('domains'): | |||||
domain, _domain_created = get_or_create(session, Domain, domain=domain_el) | |||||
domains.append({'id': domain.id, 'domain': domain_el, 'created': _domain_created}) | |||||
return Response( | |||||
status=200, | |||||
body=self.resource.encode({ | |||||
'id': group.id, | |||||
'name': group.name, | |||||
'domains': domains, | |||||
'created': _created | |||||
}), content_type='application/json') | |||||
class RestResource: | |||||
def __init__(self, notes, factory, collection, properties, id_field): | |||||
self.notes = notes | |||||
self.factory = factory | |||||
self.collection = collection | |||||
self.properties = properties | |||||
self.id_field = id_field | |||||
self.domain_endpoint = DomainEndpoint(self) | |||||
self.domain_groups_endpoint = DomainGroupEndpoint(self) | |||||
def register(self, router: UrlDispatcher): | |||||
router.add_route('*', '/{domains}'.format(notes=self.notes), self.domain_endpoint.dispatch) | |||||
router.add_route('*', '/{domain_groups}'.format(notes=self.notes), self.domain_groups_endpoint.dispatch) | |||||
def render(self, instance): | |||||
return OrderedDict((notes, getattr(instance, notes)) for notes in self.properties) | |||||
@staticmethod | |||||
def encode(data): | |||||
return json.dumps(data, indent=4).encode('utf-8') | |||||
def render_and_encode(self, instance): | |||||
return self.encode(self.render(instance)) |
@ -0,0 +1,45 @@ | |||||
import re | |||||
class ValidationFailure(BaseException): | |||||
""" | |||||
class for Validation exceptions | |||||
""" | |||||
domain_pattern = re.compile( | |||||
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|' | |||||
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|' | |||||
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.' | |||||
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$' | |||||
) | |||||
def domain(value, raise_errors=True): | |||||
""" | |||||
Return whether or not given value is a valid domain. | |||||
If the value is valid domain name this function returns ``True``, otherwise | |||||
:class:`~validators.ValidationFailure` or False if raise_errors muted. | |||||
Examples:: | |||||
>>> domain('example.com') | |||||
True | |||||
>>> domain('example.com/') | |||||
ValidationFailure(func=domain, ...) | |||||
Supports IDN domains as well:: | |||||
>>> domain('xn----gtbspbbmkef.xn--p1ai') | |||||
True | |||||
:param value: domain string to validate | |||||
:param raise_errors: raise errors or return False | |||||
""" | |||||
if domain_pattern.match(value) is None: | |||||
if raise_errors: | |||||
raise ValidationFailure("{} is not valid domain".format(value)) | |||||
else: | |||||
return False | |||||
return True | |||||
@ -1,2 +1,3 @@ | |||||
lxml | |||||
requests | |||||
aiohttp | |||||
asyncio | |||||
aiofile |
@ -1,16 +1,15 @@ | |||||
import pysitemap | |||||
""" | |||||
Example script | |||||
Uses gevent to implement multiprocessing if Gevent installed | |||||
To install gevent: | |||||
$ pip install gevent | |||||
""" | |||||
import sys | |||||
import logging | |||||
from pysitemap import crawler | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
url = 'http://www.lumpro.ru/' # url from to crawl | |||||
logfile = 'errlog.log' # path to logfile | |||||
oformat = 'xml' # output format | |||||
outputfile = 'sitemap.xml' # path to output file | |||||
crawl = pysitemap.Crawler(url=url, logfile=logfile, oformat=oformat, outputfile=outputfile) | |||||
crawl.crawl(pool_size=20, echo=True) | |||||
if '--iocp' in sys.argv: | |||||
from asyncio import events, windows_events | |||||
sys.argv.remove('--iocp') | |||||
logging.info('using iocp') | |||||
el = windows_events.ProactorEventLoop() | |||||
events.set_event_loop(el) | |||||
# root_url = sys.argv[1] | |||||
root_url = 'https://www.haikson.com' | |||||
crawler(root_url, out_file='sitemap.xml') |