|
|
@ -197,8 +197,7 @@ class Crawler: |
|
|
|
# print("warning: ignoring tag data value:", key) |
|
|
|
|
|
|
|
if len(arg_dict) == 1: |
|
|
|
tag.append(arg_dict) |
|
|
|
tags.append(tag) |
|
|
|
tags.append(arg_dict) |
|
|
|
return tags |
|
|
|
|
|
|
|
async def addtagdata(self, tagdata, url, source_url_field, |
|
|
@ -209,48 +208,47 @@ class Crawler: |
|
|
|
:return: dictionary of validated tags (of single type) |
|
|
|
""" |
|
|
|
tags = [] |
|
|
|
for data in tagdata: |
|
|
|
for tag in data: |
|
|
|
if not source_url_field in tag: |
|
|
|
continue |
|
|
|
if not await self.contains(tag[source_url_field], excludes, rlist=True): |
|
|
|
|
|
|
|
if this_domain: |
|
|
|
if not tag[source_url_field].startswith('http'): |
|
|
|
for tag_root_url in tag_root_urls: |
|
|
|
if url.startswith(tag_root_url): |
|
|
|
tag[source_url_field] = tag_root_url + tag[source_url_field] |
|
|
|
break |
|
|
|
else: |
|
|
|
if not tag[source_url_field].startswith('http'): |
|
|
|
continue |
|
|
|
|
|
|
|
if (tag[source_url_field].startswith('http') and |
|
|
|
data not in done_list and |
|
|
|
tag[source_url_field] not in self.busy and |
|
|
|
tag[source_url_field] not in self.todo_queue): |
|
|
|
self.todo_queue.add(tag[source_url_field]) |
|
|
|
# Acquire semaphore |
|
|
|
await self.sem.acquire() |
|
|
|
# Create async task |
|
|
|
task = asyncio.ensure_future(self.mimechecker(tag[source_url_field], mimetype)) |
|
|
|
# Add collback into task to release semaphore |
|
|
|
task.add_done_callback(lambda t: self.sem.release()) |
|
|
|
# Callback to remove task from tasks |
|
|
|
task.add_done_callback(self.tasks.remove) |
|
|
|
# Add task into tasks |
|
|
|
self.tasks.add(task) |
|
|
|
try: |
|
|
|
result = await asyncio.wait_for(task, timeout=20) |
|
|
|
if (result): |
|
|
|
tags.append(data) |
|
|
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
|
print("couldn't add tag data:", tag[source_url_field]) |
|
|
|
task.cancel() |
|
|
|
pass |
|
|
|
|
|
|
|
done_list.extend(tags) |
|
|
|
for tag in tagdata: |
|
|
|
if not source_url_field in tag: |
|
|
|
continue |
|
|
|
if not await self.contains(tag[source_url_field], excludes, rlist=True): |
|
|
|
|
|
|
|
if this_domain: |
|
|
|
if not tag[source_url_field].startswith('http'): |
|
|
|
for tag_root_url in tag_root_urls: |
|
|
|
if url.startswith(tag_root_url): |
|
|
|
tag[source_url_field] = tag_root_url + tag[source_url_field] |
|
|
|
break |
|
|
|
else: |
|
|
|
if not tag[source_url_field].startswith('http'): |
|
|
|
continue |
|
|
|
|
|
|
|
if (tag[source_url_field].startswith('http') and |
|
|
|
tag not in done_list and |
|
|
|
tag[source_url_field] not in self.busy and |
|
|
|
tag[source_url_field] not in self.todo_queue): |
|
|
|
self.todo_queue.add(tag[source_url_field]) |
|
|
|
# Acquire semaphore |
|
|
|
await self.sem.acquire() |
|
|
|
# Create async task |
|
|
|
task = asyncio.ensure_future(self.mimechecker(tag[source_url_field], mimetype)) |
|
|
|
# Add collback into task to release semaphore |
|
|
|
task.add_done_callback(lambda t: self.sem.release()) |
|
|
|
# Callback to remove task from tasks |
|
|
|
task.add_done_callback(self.tasks.remove) |
|
|
|
# Add task into tasks |
|
|
|
self.tasks.add(task) |
|
|
|
try: |
|
|
|
result = await asyncio.wait_for(task, timeout=20) |
|
|
|
if (result): |
|
|
|
tags.append(tag) |
|
|
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
|
print("couldn't add tag data:", tag[source_url_field]) |
|
|
|
task.cancel() |
|
|
|
pass |
|
|
|
|
|
|
|
done_list.extend(tags) |
|
|
|
return tags |
|
|
|
|
|
|
|
async def process(self, url): |
|
|
|