|
#!/bin/env python
|
|
|
|
"""
|
|
URL data extractor
|
|
|
|
Pekka Helenius <pekka [dot] helenius [at] fjordtek [dot] com>
|
|
|
|
Requirements:
|
|
|
|
Python 3
|
|
Python 3 BeautifulSoup4 (python-beautifulsoup4)
|
|
Python 3 whois (python-whois; PyPI)
|
|
Python 3 JSON Schema (python-jsonschema)
|
|
Python 3 Numpy (python-numpy)
|
|
Python 3 matplotlib (python-matplotlib)
|
|
|
|
TODO: URL domain part length comparison analysis
|
|
TODO: URL non-TLD part length comparison analysis
|
|
- in phishing webpages, URL tends to be much longer than legitimate webpages
|
|
however, domains themselves tend to be much shorter (without TLD)
|
|
- phishing URLs often contain more number of dots and subdomains than legitimate URLs
|
|
- legitimate: robots.txt redirects bots to a legitimate domain rather than to the original phishing domain
|
|
|
|
TODO: Website visual similarity analysis
|
|
TODO: consistency of RDN usage in HTML data
|
|
"""
|
|
|
|
######################################
|
|
|
|
#%matplotlib notebook
|
|
import matplotlib.pyplot as plt
|
|
|
|
from bs4 import BeautifulSoup as bs
|
|
from collections import Counter
|
|
from datetime import date, datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import requests
|
|
from time import sleep
|
|
import urllib
|
|
from whois import whois
|
|
|
|
# Target URLs
|
|
urls = [
|
|
"https://hoxhunt.com/",
|
|
"https://hs.fi",
|
|
"https://ts.fi",
|
|
"https://facebook.com"
|
|
]
|
|
|
|
# Some web servers may block our request unless we set a widely used, well-known user agent string
|
|
request_headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36'
|
|
}
|
|
|
|
# Date format for domain timestamps
|
|
dateformat = "%Y/%m/%d"
|
|
|
|
# All webpages may not like fetching data too fast
|
|
# Sleep time in seconds
|
|
sleep_interval_between_requests = 0.5
|
|
|
|
# Write JSON results to a file?
|
|
use_file = True
|
|
# Full file path + name
|
|
filename = os.getcwd() + "/" + "url_info.json"
|
|
|
|
# Generate plot from existing JSON data?
|
|
plot_only = False
|
|
|
|
# Save generated plot images?
|
|
save_plot_images = True
|
|
|
|
# DPI of plot images
|
|
plot_images_dpi = 150
|
|
|
|
# Common link attribute references in various HTML elements
|
|
link_refs = {
|
|
'a': 'href',
|
|
'img': 'src',
|
|
'script': 'src'
|
|
}
|
|
|
|
############################################################################
|
|
############################################################################
|
|
|
|
class json_url_data(object):
|
|
|
|
# def __init__(self):
|
|
|
|
######################################
|
|
"""
|
|
Set a new HTTP session and get response.
|
|
|
|
Returns a requests.models.Response object.
|
|
"""
|
|
def set_session(self, url, method='get', redirects=True):
|
|
|
|
# HTTP response status codes 1XX, 2XX and 3XX are OK
|
|
# Treat other codes as errors
|
|
sc = re.compile(r"^[123]{1}[0-9]{2}")
|
|
|
|
sleep(sleep_interval_between_requests)
|
|
|
|
try:
|
|
session = requests.Session()
|
|
response = session.request(method, url, headers=request_headers, allow_redirects=redirects)
|
|
|
|
if not sc.match(str(response.status_code)):
|
|
raise Exception("Error: got invalid response status from the web server")
|
|
return response
|
|
|
|
except:
|
|
raise Exception("Error: HTTP session could not be established. URL: '" + url + "' (method: " + method + ")") from None
|
|
|
|
######################################
|
|
"""
|
|
Fetch HTML data.
|
|
|
|
Returns a bs4.BeautifulSoup object.
|
|
"""
|
|
def get_html_data(self, url):
|
|
|
|
try:
|
|
data = bs(self.set_session(url).content, 'html.parser')
|
|
return data
|
|
except:
|
|
raise Exception("Error: HTML data could not be retrieved")
|
|
|
|
######################################
|
|
"""
|
|
Get URL redirects and related HTTP status codes.
|
|
|
|
Returns a list object.
|
|
"""
|
|
def get_url_redirects(self, url):
|
|
|
|
response = self.set_session(url)
|
|
list_data = []
|
|
|
|
if response.history:
|
|
|
|
for r in response.history:
|
|
list_data.append({'redirect_url': r.url, 'status': r.status_code})
|
|
|
|
return list_data
|
|
|
|
######################################
|
|
"""
|
|
Extract title HTML element contents from given HTML data.
|
|
|
|
Returns a string object.
|
|
"""
|
|
def get_webpage_title(self, url):
|
|
|
|
html_data = self.get_html_data(url)
|
|
|
|
title = html_data.title.string
|
|
return title
|
|
|
|
######################################
|
|
"""
|
|
Get WHOIS domain data.
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_whois_data(self, url):
|
|
dict_data = whois(url)
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Get domain name based on WHOIS domain data.
|
|
"""
|
|
def get_domain_name(self, url):
|
|
domain_name = self.get_whois_data(url).domain_name
|
|
|
|
if type(domain_name) is list:
|
|
return domain_name[0].lower()
|
|
else:
|
|
return domain_name.lower()
|
|
|
|
######################################
|
|
"""
|
|
Get initial and final URLs
|
|
|
|
Compare whether the final (destination) URL
|
|
matches with the initial URL in a request.
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_startfinal_urls(self, url):
|
|
|
|
response = self.set_session(url)
|
|
end_url = response.url
|
|
|
|
start_match = False
|
|
final_match = False
|
|
|
|
# dr = re.compile(r"^([a-z]+://)?([^/]+)")
|
|
# dr_group_lastindex = dr.match(url).lastindex
|
|
# domain_name = dr.match(url).group(dr_group_lastindex)
|
|
|
|
domain_name = self.get_domain_name(url)
|
|
|
|
if re.search(domain_name, end_url):
|
|
final_match = True
|
|
|
|
dict_data = {
|
|
'startfinal_urls': {
|
|
'start_url': {
|
|
'url': url
|
|
},
|
|
'final_url': {
|
|
'url': end_url, 'domain_match': final_match
|
|
}
|
|
}
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Get domain registrar
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_domain_registrar(self, url):
|
|
dict_data = {'domain_registrar': self.get_whois_data(url).registrar }
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Do comparison between the domain name, extracted
|
|
from WHOIS domain data and contents of a title HTML
|
|
element, extracted from HTML data based on a given URL.
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_domain_title_match(self, url):
|
|
|
|
domain_name = self.get_domain_name(url)
|
|
title = self.get_webpage_title(url)
|
|
|
|
# If is string:
|
|
if type(domain_name) is str:
|
|
if re.search(domain_name, title, re.IGNORECASE):
|
|
match = True
|
|
else:
|
|
match = False
|
|
|
|
# If is list:
|
|
elif type(domain_name) is list:
|
|
for d in domain_name:
|
|
if re.search(d, title, re.IGNORECASE):
|
|
match = True
|
|
break
|
|
else:
|
|
match = False
|
|
else:
|
|
match = False
|
|
|
|
dict_data = {
|
|
'webpage_title': title,
|
|
'domain_in_webpage_title': match
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Get a single timestamp from given data
|
|
|
|
Two scenarios are considered: dates argument is either
|
|
a list or a string. If it is a list, then we need
|
|
to decide which date value to extract.
|
|
|
|
Returns a date object.
|
|
"""
|
|
def get_single_date(self, dates, newest=False):
|
|
|
|
dates_epoch = []
|
|
|
|
if type(dates) is list:
|
|
for d in dates:
|
|
dates_epoch.append(d.timestamp())
|
|
else:
|
|
dates_epoch.append(dates.timestamp())
|
|
|
|
return datetime.fromtimestamp(sorted(dates_epoch, reverse=newest)[0])
|
|
|
|
######################################
|
|
"""
|
|
Get domain time information based on WHOIS domain data.
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_domain_timeinfo(self, url):
|
|
|
|
whois_data = self.get_whois_data(url)
|
|
domain_creation_date = self.get_single_date(whois_data.creation_date, newest = False)
|
|
domain_updated_date = self.get_single_date(whois_data.updated_date, newest = False)
|
|
domain_expiration_date = self.get_single_date(whois_data.expiration_date, newest = False)
|
|
|
|
dict_data = {
|
|
'domain_timestamps':
|
|
{
|
|
'created': domain_creation_date.strftime(dateformat),
|
|
'updated': domain_updated_date.strftime(dateformat),
|
|
'expires': domain_expiration_date.strftime(dateformat)
|
|
}
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Get domain time information based on WHOIS domain data,
|
|
relative to the current date (UTC time).
|
|
|
|
Returns a dict object.
|
|
"""
|
|
def get_domain_timeinfo_relative(self, url):
|
|
|
|
date_now = datetime.utcnow()
|
|
|
|
whois_data = self.get_whois_data(url)
|
|
domain_creation_date = self.get_single_date(whois_data.creation_date, newest = False)
|
|
domain_updated_date = self.get_single_date(whois_data.updated_date, newest = False)
|
|
domain_expiration_date = self.get_single_date(whois_data.expiration_date, newest = False)
|
|
|
|
dict_data = {
|
|
'domain_timestamps_relative':
|
|
{
|
|
'current_date': (date_now.strftime(dateformat)),
|
|
'created_days_ago': (date_now - domain_creation_date).days,
|
|
'updated_days_ago': (date_now - domain_updated_date).days,
|
|
'expires_days_left': (domain_expiration_date - date_now).days
|
|
}
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
Determine whether URL matches syntaxes such as
|
|
'../foo/bar/'
|
|
'/foo/../../bar/,
|
|
'https://foo.bar/foo/../'
|
|
|
|
etc.
|
|
|
|
Returns a boolean object.
|
|
"""
|
|
def is_multidot_url(self, url):
|
|
|
|
multidot = re.compile(r".*[.]{2}/.*")
|
|
|
|
if multidot.match(url):
|
|
return True
|
|
return False
|
|
|
|
######################################
|
|
"""
|
|
Get HTML element data from HTML data contents.
|
|
|
|
Two fetching methods are supported:
|
|
- A) use only HTML element/tag name and extract raw contents of
|
|
these tags
|
|
- B) use both HTML element/tag name and more fine-grained
|
|
inner attribute name to determine which HTML elements are extracted
|
|
|
|
Special case - URL link references:
|
|
- attributes 'href' or 'src' are considered as link referrals and
|
|
they are handled in a special way
|
|
- A) link referrals to directly to domain are placed in 'self_refs' list
|
|
(patterns: '/', '#', '../' and '/<anything>')
|
|
- B) link referrals to external domains are placed in 'ext_refs' list
|
|
(patterns such as 'https://foo.bar.dot/fancysite' etc.)
|
|
|
|
- Both A) and B) link categories have 'normal' and 'multidot' subcategories
|
|
- normal links do not contain pattern '../'
|
|
- multidot links contain '../' pattern
|
|
|
|
Returns a dict object.
|
|
"""
|
|
|
|
def get_tag_data(self, url, tag, attribute=None):
|
|
|
|
html_data = self.get_html_data(url)
|
|
domain_name = self.get_domain_name(url)
|
|
data = []
|
|
|
|
if attribute != None:
|
|
|
|
for d in html_data.find_all(tag):
|
|
|
|
# Ignore the HTML tag if it does not contain our attribute
|
|
if d.get(attribute) != None:
|
|
data.append(d.get(attribute))
|
|
|
|
if attribute == 'href' or attribute == 'src':
|
|
|
|
self_refs = { 'normal': [], 'multidot': []}
|
|
ext_refs = { 'normal': [], 'multidot': []}
|
|
|
|
# Syntax: '#<anything>', '/<anything>', '../<anything>'
|
|
rs = re.compile(r"^[/#]|^[.]{2}/.*")
|
|
|
|
# Syntax: '<text>:<text>/'
|
|
rd = re.compile(r"^[a-z]+:[a-z]+/")
|
|
|
|
# Syntax examples:
|
|
# 'http://foo.bar/', 'https://foo.bar/, 'foo.bar/', 'https://virus.foo.bar/'
|
|
rl = re.compile(r"^([a-z]+://)?([^/]*" + domain_name + "/)")
|
|
|
|
for s in data:
|
|
|
|
# Ignore mailto links
|
|
if re.match("^mailto:", s): continue
|
|
|
|
if rs.match(s) or rl.match(s) or rd.match(s):
|
|
if self.is_multidot_url(s):
|
|
self_refs['multidot'].append(s)
|
|
else:
|
|
self_refs['normal'].append(s)
|
|
else:
|
|
|
|
if self.is_multidot_url(s):
|
|
try:
|
|
ext_refs['multidot'].append({'url': s, 'registrar': self.get_whois_data(s).registrar })
|
|
except:
|
|
# Fallback if WHOIS query fails
|
|
ext_refs['normal'].append({'url': s, 'registrar': None })
|
|
pass
|
|
else:
|
|
try:
|
|
ext_refs['normal'].append({'url': s, 'registrar': self.get_whois_data(s).registrar })
|
|
except:
|
|
ext_refs['normal'].append({'url': s, 'registrar': None })
|
|
pass
|
|
|
|
data = None
|
|
|
|
dict_data = {
|
|
tag: {
|
|
attribute + '_ext': (ext_refs),
|
|
attribute + '_self': (self_refs)
|
|
}
|
|
}
|
|
|
|
else:
|
|
dict_data = {
|
|
tag: {
|
|
attribute: (data)
|
|
}
|
|
}
|
|
|
|
else:
|
|
for d in html_data.find_all(tag):
|
|
data.append(d.prettify())
|
|
|
|
dict_data = {
|
|
tag: (data)
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
"""
|
|
How many external URL links have same registrar than
|
|
the webpage itself?
|
|
"""
|
|
def get_registrar_count(self, registrar, urls):
|
|
|
|
i = 0
|
|
|
|
for u in urls:
|
|
for k,v in u.items():
|
|
if k == 'registrar' and v == registrar:
|
|
i += 1
|
|
|
|
o = len(urls) - i
|
|
|
|
dict_data = {
|
|
'same_registrar_count': i,
|
|
'other_registrar_count': o
|
|
}
|
|
|
|
return dict_data
|
|
|
|
######################################
|
|
|
|
"""
|
|
Get values existing in a dict object,
|
|
based on a known key string.
|
|
|
|
Returns a list object.
|
|
|
|
TODO: Major re-work for the fetch function
|
|
|
|
TODO: Support for more sophisticated JSON key string filtering
|
|
(possibility to use multiple keys for filtering)
|
|
"""
|
|
class json_fetcher(object):
|
|
|
|
def __init__(self, dict_data, json_key):
|
|
self.json_dict = json.loads(json.dumps(dict_data))
|
|
self.json_key = json_key
|
|
|
|
##########
|
|
# Ref: https://www.codespeedy.com/how-to-loop-through-json-with-subkeys-in-python/
|
|
def fetch(self, jdata):
|
|
|
|
if isinstance(jdata, dict):
|
|
|
|
for k,v in jdata.items():
|
|
if k == self.json_key:
|
|
yield v
|
|
elif isinstance(v, dict):
|
|
for val in self.fetch(v):
|
|
yield val
|
|
elif isinstance(v, list):
|
|
for l in v:
|
|
if isinstance(l, dict):
|
|
for ka,va in l.items():
|
|
if ka == self.json_key:
|
|
yield va
|
|
|
|
elif isinstance(jdata, list):
|
|
for l in jdata:
|
|
if isinstance(l, dict):
|
|
for k,v in l.items():
|
|
if k == self.json_key:
|
|
yield v
|
|
elif isinstance(l, list):
|
|
for lb in v:
|
|
for ka,va in lb.items():
|
|
if ka == self.json_key:
|
|
yield va
|
|
|
|
##########
|
|
def get_data(self, flatten=True):
|
|
|
|
data_extract = []
|
|
flat_data = []
|
|
|
|
for i in self.fetch(self.json_dict):
|
|
data_extract.append(i)
|
|
|
|
# Flatten possible nested lists
|
|
# (i.e. JSON data contains multiple keys in
|
|
# different nested sections)
|
|
def get_data_extract(ld):
|
|
for l in ld:
|
|
if isinstance(l, list):
|
|
for la in get_data_extract(l):
|
|
yield la
|
|
else:
|
|
yield l
|
|
|
|
if flatten == True:
|
|
for u in get_data_extract(data_extract):
|
|
flat_data.append(u)
|
|
|
|
return flat_data
|
|
else:
|
|
return data_extract
|
|
|
|
######################################
|
|
"""
|
|
Compile URL related data.
|
|
"""
|
|
def get_url_data(self, url):
|
|
|
|
# Dict object for simple, non-nested data
|
|
data_simple = {}
|
|
|
|
# Pre-defined dict object for specific data sets
|
|
webpage_data = {}
|
|
|
|
startfinal_url = self.get_startfinal_urls(url)
|
|
redirect_url = self.get_url_redirects(url)
|
|
domain_registrar = self.get_domain_registrar(url)
|
|
domaintitle_match = self.get_domain_title_match(url)
|
|
|
|
domain_time_relative = self.get_domain_timeinfo_relative(url)
|
|
domain_time = self.get_domain_timeinfo(url)
|
|
|
|
html_element_iframe = self.get_tag_data(url, 'iframe')
|
|
html_element_a_href = self.get_tag_data(url, 'a', link_refs['a'])
|
|
html_element_img_src = self.get_tag_data(url, 'img', link_refs['img'])
|
|
html_element_script_src = self.get_tag_data(url, 'script', link_refs['script'])
|
|
|
|
iframes_count = {
|
|
'iframes_count':
|
|
len(self.json_fetcher(html_element_iframe, 'iframe').get_data())
|
|
}
|
|
|
|
multidot_urls_count = {
|
|
'multidot_url_count':
|
|
len(self.json_fetcher(html_element_a_href, 'multidot').get_data()) + len(self.json_fetcher(html_element_img_src, 'multidot').get_data()) + len(self.json_fetcher(html_element_script_src, 'multidot').get_data())
|
|
}
|
|
|
|
###################
|
|
def get_total_registrars():
|
|
|
|
same_registrar_counts = 0
|
|
other_registrar_counts = 0
|
|
for k,v in link_refs.items():
|
|
|
|
html_element = self.get_tag_data(url, k, v)
|
|
|
|
same_registrar_counts += self.get_registrar_count(
|
|
domain_registrar['domain_registrar'],
|
|
html_element[k][v + '_ext']['normal']
|
|
)['same_registrar_count']
|
|
|
|
other_registrar_counts += self.get_registrar_count(
|
|
domain_registrar['domain_registrar'],
|
|
html_element[k][v + '_ext']['normal']
|
|
)['other_registrar_count']
|
|
|
|
registrar_counts = {
|
|
'same_registrar_count': same_registrar_counts,
|
|
'other_registrar_count': other_registrar_counts
|
|
}
|
|
return registrar_counts
|
|
|
|
# Avoid unnecessary nesting of the following data
|
|
data_simple.update(domain_registrar)
|
|
data_simple.update(domaintitle_match)
|
|
data_simple.update(iframes_count)
|
|
data_simple.update(multidot_urls_count)
|
|
data_simple.update(get_total_registrars())
|
|
|
|
url_data = dict({
|
|
url: [
|
|
data_simple,
|
|
startfinal_url,
|
|
{'redirects': redirect_url},
|
|
|
|
domain_time_relative,
|
|
domain_time,
|
|
|
|
{'webpage_data': [
|
|
html_element_iframe,
|
|
html_element_a_href,
|
|
html_element_img_src,
|
|
html_element_script_src
|
|
]
|
|
}
|
|
]
|
|
})
|
|
|
|
return url_data
|
|
|
|
|
|
|
|
class write_operations(object):
|
|
|
|
def __init__(self):
|
|
self.filename = filename
|
|
|
|
######################################
|
|
"""
|
|
Set JSON file name, append number suffix
|
|
# if file exists already.
|
|
|
|
Returns file name path.
|
|
"""
|
|
def set_filename(self):
|
|
|
|
c = 0
|
|
while True:
|
|
if os.path.exists(self.filename):
|
|
if c == 0:
|
|
self.filename = self.filename + "." + str(c)
|
|
else:
|
|
self.filename = re.sub("[0-9]+$", str(c), self.filename)
|
|
else:
|
|
break
|
|
c += 1
|
|
return self.filename
|
|
|
|
######################################
|
|
"""
|
|
Append to a JSON file.
|
|
"""
|
|
def write_to_file(self, data):
|
|
|
|
try:
|
|
json_file = open(self.filename, "a")
|
|
json_file.write(data)
|
|
json_file.close()
|
|
return 0
|
|
except:
|
|
return 1
|
|
|
|
######################################
|
|
"""
|
|
Fetch all pre-defined URLs.
|
|
"""
|
|
def fetch_and_store_url_data(self, urls, use_file):
|
|
|
|
data_parts = {}
|
|
fetch_json_data = json_url_data()
|
|
|
|
for u in urls:
|
|
print("Fetching URL data: %s" % u)
|
|
try:
|
|
data_parts.update(fetch_json_data.get_url_data(u))
|
|
except:
|
|
print("Failed: %s" % u)
|
|
pass
|
|
|
|
json_data = json.dumps(data_parts)
|
|
|
|
if use_file == True:
|
|
self.write_to_file(json_data)
|
|
|
|
return json_data
|
|
|
|
######################################
|
|
"""
|
|
Visualize & summarize data.
|
|
"""
|
|
|
|
class data_visualization(object):
|
|
|
|
def __init__(self, url, json_data):
|
|
self.url = url
|
|
self.json_data = json_data
|
|
|
|
self.data = json.loads(json.dumps(self.json_data)).get(self.url)
|
|
self.json_url_obj = json_url_data()
|
|
self.domain_registrar = self.json_url_obj.get_domain_registrar(self.url)['domain_registrar']
|
|
self.webpage_data = self.json_url_obj.json_fetcher(self.data, 'webpage_data').get_data()
|
|
|
|
def get_urls_count_summary(self):
|
|
|
|
unique_refs = []
|
|
|
|
for k,v in link_refs.items():
|
|
if v in unique_refs: continue
|
|
unique_refs.append(v)
|
|
|
|
def link_count(refs, suffix):
|
|
|
|
urls_cnt = 0
|
|
|
|
for u in self.webpage_data:
|
|
for l in refs:
|
|
urls = self.json_url_obj.json_fetcher(u, l + suffix).get_data()
|
|
for n in urls:
|
|
urls_cnt += len(n['normal'])
|
|
urls_cnt += len(n['multidot'])
|
|
return urls_cnt
|
|
|
|
data = {
|
|
'local_urls': link_count(unique_refs, '_self'),
|
|
'external_urls': link_count(unique_refs, '_ext')
|
|
}
|
|
|
|
return data
|
|
|
|
def get_registrars(self):
|
|
|
|
registrars = []
|
|
#registrars.append(self.domain_registrar)
|
|
|
|
for w in self.webpage_data:
|
|
webpage_registrars = self.json_url_obj.json_fetcher(w, 'registrar').get_data()
|
|
for wa in webpage_registrars:
|
|
if wa != None:
|
|
registrars.append(wa)
|
|
return registrars
|
|
|
|
def get_registrar_count_summary(self):
|
|
|
|
domain_counter = dict(Counter(self.get_registrars()))
|
|
data = {'fetched_domains': domain_counter, 'url_domain_registrar': self.domain_registrar }
|
|
return data
|
|
|
|
######################################
|
|
"""
|
|
Execute the main program code.
|
|
|
|
TODO: this code must figure out the correct JSON file
|
|
if multiple generated files are present.
|
|
"""
|
|
if __name__ == '__main__':
|
|
|
|
if plot_only == False:
|
|
write_obj = write_operations()
|
|
write_obj.set_filename()
|
|
data = write_obj.fetch_and_store_url_data(urls, use_file)
|
|
|
|
url_str_pattern = re.compile(r"(^[a-z]+://)?([^/]*)")
|
|
|
|
if os.path.exists(filename):
|
|
with open(filename, "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
else:
|
|
json_data = data
|
|
|
|
# Get URLs from an available JSON data
|
|
for key_url in json_data.keys():
|
|
|
|
print("Generating statistics: %s" % key_url)
|
|
|
|
fig = plt.figure()
|
|
fig_params = {
|
|
'xtick.labelsize': 8,
|
|
'figure.figsize': [9,8]
|
|
# 'figure.constrained_layout.use': True
|
|
}
|
|
plt.rcParams.update(fig_params)
|
|
|
|
domain_string = url_str_pattern.split(key_url)[2].replace('.','')
|
|
summary = data_visualization(key_url, json_data)
|
|
|
|
summary_registrars = summary.get_registrar_count_summary()['fetched_domains']
|
|
|
|
x_r = list(summary_registrars.keys())
|
|
y_r = list(summary_registrars.values())
|
|
|
|
# Show bar values
|
|
for index,data in enumerate(y_r):
|
|
plt.text(x=index, y=data+0.5, s=data, fontdict=dict(fontsize=8))
|
|
|
|
title_r = "Domains associated with HTML URL data (" + key_url + ")"
|
|
xlabel_r = "Fetched domains"
|
|
ylabel_r = "Domain count"
|
|
|
|
plt.bar(x_r, y_r, color="green", edgecolor="black")
|
|
plt.title(title_r)
|
|
plt.xlabel(xlabel_r)
|
|
plt.ylabel(ylabel_r)
|
|
plt.xticks(rotation=45, horizontalalignment="right")
|
|
|
|
if save_plot_images == True:
|
|
plt.savefig(os.getcwd() + "/" + "domain_figure_" + domain_string + ".png", dpi=plot_images_dpi)
|
|
plt.show()
|
|
|
|
#fig_u = plt.figure()
|
|
|
|
#summary_urls = summary.get_urls_count_summary()
|
|
|
|
#x_u = list(summary_urls.keys())
|
|
#y_u = list(summary_urls.values())
|
|
#title_u = "Local and external URL references (" + key_url + ")"
|
|
#xlabel_u = "Fetched URLs"
|
|
#ylabel_u = "URL count"
|
|
|
|
#plt.bar(x_u, y_u, color="blue", edgecolor='black')
|
|
#plt.title(title_u)
|
|
#plt.xlabel(xlabel_u)
|
|
#plt.ylabel(ylabel_u)
|
|
#plt.show()
|