More refinement of frontier and worker for delicious multi-threading
This commit is contained in:
parent
58c923f075
commit
8b96a7c9f7
@ -1,6 +1,6 @@
|
|||||||
[IDENTIFICATION]
|
[IDENTIFICATION]
|
||||||
# Set your user agent string here.
|
# Set your user agent string here.
|
||||||
USERAGENT = IR US22 19854690,44333574
|
USERAGENT = IR US22 19854690,44333574,95241547
|
||||||
|
|
||||||
[CONNECTION]
|
[CONNECTION]
|
||||||
HOST = styx.ics.uci.edu
|
HOST = styx.ics.uci.edu
|
||||||
@ -9,12 +9,12 @@ PORT = 9000
|
|||||||
[CRAWLER]
|
[CRAWLER]
|
||||||
SEEDURL = https://www.ics.uci.edu,https://www.cs.uci.edu,https://www.informatics.uci.edu,https://www.stat.uci.edu
|
SEEDURL = https://www.ics.uci.edu,https://www.cs.uci.edu,https://www.informatics.uci.edu,https://www.stat.uci.edu
|
||||||
# In seconds
|
# In seconds
|
||||||
POLITENESS = 0.05
|
POLITENESS = 0.5
|
||||||
|
|
||||||
[LOCAL PROPERTIES]
|
[LOCAL PROPERTIES]
|
||||||
# Save file for progress
|
# Save file for progress
|
||||||
SAVE = frontier.shelve
|
SAVE = frontier.shelve
|
||||||
|
|
||||||
# IMPORTANT: DO NOT CHANGE IT IF YOU HAVE NOT IMPLEMENTED MULTITHREADING.
|
# IMPORTANT: DO NOT CHANGE IT IF YOU HAVE NOT IMPLEMENTED MULTITHREADING.
|
||||||
THREADCOUNT = 8
|
THREADCOUNT = 1
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import shelve
|
import shelve
|
||||||
|
|
||||||
from threading import Thread, RLock
|
from threading import Thread, Lock,Semaphore
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
|
|
||||||
from utils import get_logger, get_urlhash, normalize
|
from utils import get_logger, get_urlhash, normalize
|
||||||
@ -15,12 +15,14 @@ from datacollection import *
|
|||||||
#*.stat.uci.edu/* 3
|
#*.stat.uci.edu/* 3
|
||||||
#today.uci.edu/department/information_computer_sciences/* 4
|
#today.uci.edu/department/information_computer_sciences/* 4
|
||||||
|
|
||||||
|
#Semaphore for each domain to keep each domain noice and tidy with politeness
|
||||||
domain_semaphores = [Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3)]
|
domain_semaphores = [Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3)]
|
||||||
data_mutex = Lock()
|
data_mutex = Lock()
|
||||||
|
|
||||||
file_1_mutex = Lock()
|
file_1_mutex = Lock()
|
||||||
file_2_mutex = Lock()
|
file_2_mutex = Lock()
|
||||||
file_3_mutex = Lock()
|
file_3_mutex = Lock()
|
||||||
file_4_mutex = LocK()
|
file_4_mutex = Lock()
|
||||||
|
|
||||||
class Frontier(object):
|
class Frontier(object):
|
||||||
def __init__(self, config, restart):
|
def __init__(self, config, restart):
|
||||||
@ -112,72 +114,6 @@ class Frontier(object):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Q1
|
|
||||||
###CRITICAL SECTION
|
|
||||||
file_1_mutex.acquire()
|
|
||||||
self.uniques.add(removeFragment(url))
|
|
||||||
|
|
||||||
#Writing to local file
|
|
||||||
f = open("q1.txt", "w")
|
|
||||||
f.write("Number of unique pages: {length}\n".format(length = len(self.uniques)))
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
file_1_mutex.release()
|
|
||||||
|
|
||||||
# Q2
|
|
||||||
file_2_mutex.acquire()
|
|
||||||
tempTok = tokenize(url)
|
|
||||||
if len(tempTok) > self.max:
|
|
||||||
self.max = len(tempTok)
|
|
||||||
self.longest = url
|
|
||||||
|
|
||||||
|
|
||||||
# creating text file for question 2
|
|
||||||
f = open("q2.txt", "w")
|
|
||||||
f.write("Largest page url: {url} \nLength of page: {length}".format(url = self.longest, length = self.max))
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
file_2_mutex.release()
|
|
||||||
|
|
||||||
# Q3
|
|
||||||
file_3_mutex.acquire()
|
|
||||||
tempTok = removeStopWords(tempTok)
|
|
||||||
computeFrequencies(tempTok, self.grand_dict)
|
|
||||||
|
|
||||||
# creating text file for question 3
|
|
||||||
f = open("q3.txt", "w")
|
|
||||||
sortedGrandDict = {k: v for k, v in sorted(self.grand_dict.items(), key=lambda item: item[1], reverse = True)}
|
|
||||||
i = 0
|
|
||||||
for k, v in sortedGrandDict.items():
|
|
||||||
if i == 50:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
f.write("{}: {}\n".format(k, v))
|
|
||||||
i += 1
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
file_3_mutex.release()
|
|
||||||
|
|
||||||
# Q4
|
|
||||||
file_4_mutex.acquire()
|
|
||||||
|
|
||||||
fragless = removeFragment(url)
|
|
||||||
domain = findDomains(fragless.netloc)
|
|
||||||
if domain[1] == 'ics':
|
|
||||||
if domain[0] not in self.ics:
|
|
||||||
self.ics[domain[0]] = urlData(url, domain[0], domain[1])
|
|
||||||
else:
|
|
||||||
if fragless not in self.ics[domain[0]].getUniques():
|
|
||||||
self.ics[domain[0]].appendUnique(fragless)
|
|
||||||
|
|
||||||
# creating text file for question 4
|
|
||||||
sortedDictKeys = sorted(self.ics.keys())
|
|
||||||
f = open("q4.txt", "w")
|
|
||||||
for i in sortedDictKeys:
|
|
||||||
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
file_4_mutex.release()
|
|
||||||
|
|
||||||
def acquire_polite(url):
|
def acquire_polite(url):
|
||||||
return domain_semaphores[get_semaphore_index(url)].acquire()
|
return domain_semaphores[get_semaphore_index(url)].acquire()
|
||||||
@ -206,6 +142,8 @@ class Frontier(object):
|
|||||||
my_filename = os.path.join(path_to_script, "q1.txt")
|
my_filename = os.path.join(path_to_script, "q1.txt")
|
||||||
|
|
||||||
# Will create a file of all the unique links and you can read the file and do lines = f.readlines() then len(lines) to get the number of unique links
|
# Will create a file of all the unique links and you can read the file and do lines = f.readlines() then len(lines) to get the number of unique links
|
||||||
|
#Locking and releasing each file
|
||||||
|
file_1_mutex.acquire()
|
||||||
if (os.path.exists(my_filename)):
|
if (os.path.exists(my_filename)):
|
||||||
f = open(my_filename, 'a')
|
f = open(my_filename, 'a')
|
||||||
f.write(removeFragment(url))
|
f.write(removeFragment(url))
|
||||||
@ -214,10 +152,12 @@ class Frontier(object):
|
|||||||
f = open(my_filename, 'w')
|
f = open(my_filename, 'w')
|
||||||
f.write(removeFragment(url))
|
f.write(removeFragment(url))
|
||||||
f.close()
|
f.close()
|
||||||
|
file_1_mutex.release()
|
||||||
|
|
||||||
def q234(self, url, resp):
|
def q234(self, url, resp):
|
||||||
# rakslice (8 May 2013) Stackoverflow. https://stackoverflow.com/questions/16430258/creating-a-python-file-in-a-local-directory
|
# rakslice (8 May 2013) Stackoverflow. https://stackoverflow.com/questions/16430258/creating-a-python-file-in-a-local-directory
|
||||||
# this saves to the local directory, so I can constantly access the right file and check if it exists or not
|
# this saves to the local directory, so I can constantly access the right file and check if it exists or not
|
||||||
|
file_2_mutex.acquire()
|
||||||
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
||||||
my_filename = os.path.join(path_to_script, "q2.txt")
|
my_filename = os.path.join(path_to_script, "q2.txt")
|
||||||
|
|
||||||
@ -228,6 +168,7 @@ class Frontier(object):
|
|||||||
f = open(my_filename, 'w')
|
f = open(my_filename, 'w')
|
||||||
f.write("Longest Page: {url}, length: {length}".format(url = self.longest, length = self.max))
|
f.write("Longest Page: {url}, length: {length}".format(url = self.longest, length = self.max))
|
||||||
f.close()
|
f.close()
|
||||||
|
file_2_mutex.release()
|
||||||
|
|
||||||
tempTok = removeStopWords(tempTok)
|
tempTok = removeStopWords(tempTok)
|
||||||
computeFrequencies(tempTok, self.grand_dict)
|
computeFrequencies(tempTok, self.grand_dict)
|
||||||
@ -237,6 +178,8 @@ class Frontier(object):
|
|||||||
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
||||||
my_filename = os.path.join(path_to_script, "q3.txt")
|
my_filename = os.path.join(path_to_script, "q3.txt")
|
||||||
|
|
||||||
|
file_3_mutex.acquire()
|
||||||
|
|
||||||
f = open(my_filename, "w")
|
f = open(my_filename, "w")
|
||||||
sortedGrandDict = {k: v for k, v in sorted(self.grand_dict.items(), key=lambda item: item[1], reverse = True)}
|
sortedGrandDict = {k: v for k, v in sorted(self.grand_dict.items(), key=lambda item: item[1], reverse = True)}
|
||||||
i = 0
|
i = 0
|
||||||
@ -248,6 +191,8 @@ class Frontier(object):
|
|||||||
i += 1
|
i += 1
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
file_3_mutex.release()
|
||||||
|
|
||||||
fragless = removeFragment(url)
|
fragless = removeFragment(url)
|
||||||
domain = findDomains(fragless.netloc)
|
domain = findDomains(fragless.netloc)
|
||||||
if domain[1] == 'ics':
|
if domain[1] == 'ics':
|
||||||
@ -263,10 +208,11 @@ class Frontier(object):
|
|||||||
my_filename = os.path.join(path_to_script, "q4.txt")
|
my_filename = os.path.join(path_to_script, "q4.txt")
|
||||||
|
|
||||||
# creating text file for question 4
|
# creating text file for question 4
|
||||||
|
file_4_mutex.acquire()
|
||||||
sortedDictKeys = sorted(self.ics.keys())
|
sortedDictKeys = sorted(self.ics.keys())
|
||||||
f = open(my_filename, "w")
|
f = open(my_filename, "w")
|
||||||
for i in sortedDictKeys:
|
for i in sortedDictKeys:
|
||||||
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
|
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
|
||||||
f.close()
|
f.close()
|
||||||
|
file_4_mutex.release()
|
||||||
|
|
@ -35,16 +35,17 @@ class Worker(Thread):
|
|||||||
tic = time.perf_counter()
|
tic = time.perf_counter()
|
||||||
self.frontier.q1(tbd_url)
|
self.frontier.q1(tbd_url)
|
||||||
toc = time.perf_counter()
|
toc = time.perf_counter()
|
||||||
print(f"Took {toc - tic:0.4f} seconds to do download url")
|
print(f"Took {toc - tic:0.4f} seconds to do log q1 url")
|
||||||
|
|
||||||
tic = time.perf_counter()
|
tic = time.perf_counter()
|
||||||
self.frontier.q234(tbd_url, resp)
|
self.frontier.q234(tbd_url, resp)
|
||||||
toc = time.perf_counter()
|
toc = time.perf_counter()
|
||||||
print(f"Took {toc - tic:0.4f} seconds to do download url")
|
print(f"Took {toc - tic:0.4f} seconds to do log q234 url")
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"Downloaded {tbd_url}, status <{resp.status}>, "
|
f"Downloaded {tbd_url}, status <{resp.status}>, "
|
||||||
f"using cache {self.config.cache_server}.")
|
f"using cache {self.config.cache_server}.")
|
||||||
|
|
||||||
tic = time.perf_counter()
|
tic = time.perf_counter()
|
||||||
scraped_urls = scraper.scraper(tbd_url, resp)
|
scraped_urls = scraper.scraper(tbd_url, resp)
|
||||||
toc = time.perf_counter()
|
toc = time.perf_counter()
|
||||||
@ -57,7 +58,7 @@ class Worker(Thread):
|
|||||||
toc = time.perf_counter()
|
toc = time.perf_counter()
|
||||||
print(f"Took {toc - tic:0.4f} seconds to do store stuffs")
|
print(f"Took {toc - tic:0.4f} seconds to do store stuffs")
|
||||||
|
|
||||||
while(start + self.config.time_delay > time.perf_counter()){
|
while start + self.config.time_delay > time.perf_counter():
|
||||||
time.sleep(self.config.time_delay/5)
|
time.sleep(self.config.time_delay/5)
|
||||||
self.frontier.release_polite(tbd_url)
|
self.frontier.release_polite(tbd_url)
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user