Load balancer installed, havent not been able to test yet
This commit is contained in:
@@ -15,21 +15,29 @@ from datacollection import *
|
||||
#*.stat.uci.edu/* 3
|
||||
#today.uci.edu/department/information_computer_sciences/* 4
|
||||
|
||||
#Semaphore for each domain to keep each domain noice and tidy with politeness
|
||||
domain_semaphores = [Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3),Semaphore(3)]
|
||||
data_mutex = Lock()
|
||||
|
||||
file_1_mutex = Lock()
|
||||
file_2_mutex = Lock()
|
||||
file_3_mutex = Lock()
|
||||
file_4_mutex = Lock()
|
||||
|
||||
class Frontier(object):
|
||||
def __init__(self, config, restart):
|
||||
self.logger = get_logger("FRONTIER")
|
||||
self.config = config
|
||||
self.to_be_downloaded = list()
|
||||
|
||||
|
||||
#Load balancer, list()
|
||||
self.to_be_downloaded = [list(),list(),list(),list(),list()]
|
||||
|
||||
self.balance_index = 0
|
||||
|
||||
#Semaphore for each domain to keep each domain noice and tidy with politeness
|
||||
self.domain_semaphores = [Lock(),Lock(),Lock(),Lock(),Lock()]
|
||||
#Local data lock
|
||||
self.data_mutex = Lock()
|
||||
|
||||
#FIle locks for data to make sure everything is thread-safe
|
||||
self.file_1_mutex = Lock()
|
||||
self.file_2_mutex = Lock()
|
||||
self.file_3_mutex = Lock()
|
||||
self.file_4_mutex = Lock()
|
||||
|
||||
# data collection is going to happen in the frontier
|
||||
# uniques encompass overall unique links
|
||||
self.uniques = set()
|
||||
@@ -63,13 +71,14 @@ class Frontier(object):
|
||||
for url in self.config.seed_urls:
|
||||
self.add_url(url)
|
||||
|
||||
|
||||
def _parse_save_file(self):
|
||||
''' This function can be overridden for alternate saving techniques. '''
|
||||
total_count = len(self.save)
|
||||
tbd_count = 0
|
||||
for url, completed in self.save.values():
|
||||
if not completed and is_valid(url):
|
||||
self.to_be_downloaded.append(url)
|
||||
self.to_be_downloaded[self.get_domain_index(url)].append(url)
|
||||
tbd_count += 1
|
||||
self.logger.info(
|
||||
f"Found {tbd_count} urls to be downloaded from {total_count} "
|
||||
@@ -77,23 +86,24 @@ class Frontier(object):
|
||||
|
||||
def get_tbd_url(self):
|
||||
###CRITICAL SECTION
|
||||
data_mutex.acquire()
|
||||
self.data_mutex.acquire()
|
||||
try:
|
||||
return self.to_be_downloaded.pop()
|
||||
hold = self.to_be_downloaded[balance_index].pop()
|
||||
balance_index = balance_index + 1
|
||||
self.data_mutex.release()
|
||||
return hold
|
||||
except IndexError:
|
||||
self.data_mutex.release()
|
||||
return None
|
||||
data_mutex.release()
|
||||
|
||||
def add_url(self, url):
|
||||
url = normalize(url)
|
||||
urlhash = get_urlhash(url)
|
||||
##CRITICAL SECTION
|
||||
data_mutex.acquire()
|
||||
if urlhash not in self.save:
|
||||
self.save[urlhash] = (url, False)
|
||||
self.save.sync()
|
||||
self.to_be_downloaded.append(url)
|
||||
data_mutex.release()
|
||||
self.to_be_downloaded[self.get_domain_index(url)].append(url)
|
||||
###CRITICAL SECTION
|
||||
|
||||
|
||||
@@ -101,27 +111,18 @@ class Frontier(object):
|
||||
urlhash = get_urlhash(url)
|
||||
|
||||
##CRITICAL SECTION
|
||||
data_mutex.acquire()
|
||||
self.data_mutex.acquire()
|
||||
if urlhash not in self.save:
|
||||
# This should not happen.
|
||||
self.logger.error(
|
||||
f"Completed url {url}, but have not seen it before.")
|
||||
self.save[urlhash] = (url, True)
|
||||
self.save.sync()
|
||||
data_mutex.release()
|
||||
self.data_mutex.release()
|
||||
##CRITICAL SECTION
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def acquire_polite(url):
|
||||
return domain_semaphores[get_semaphore_index(url)].acquire()
|
||||
|
||||
def release_polite(domain):
|
||||
return domain_semaphores[get_semaphore_index(url)].release()
|
||||
|
||||
def get_semaphore_index(url):
|
||||
def get_domain_index(self,url):
|
||||
if "ics.uci.edu" in url:
|
||||
return 0
|
||||
elif "cs.uci.edu" in url:
|
||||
@@ -135,6 +136,21 @@ class Frontier(object):
|
||||
else:
|
||||
println("ERROR")
|
||||
|
||||
|
||||
|
||||
def acquire_polite(self,url):
|
||||
return self.domain_semaphores[self.get_domain_index(url)].acquire()
|
||||
|
||||
def release_polite(self,url):
|
||||
return self.domain_semaphores[self.get_domain_index(url)].release()
|
||||
|
||||
def acquire_data_mutex(self):
|
||||
return self.data_mutex.acquire()
|
||||
|
||||
def release_data_mutex(self):
|
||||
return self.data_mutex.release()
|
||||
|
||||
|
||||
def q1(self, url):
|
||||
# rakslice (8 May 2013) Stackoverflow. https://stackoverflow.com/questions/16430258/creating-a-python-file-in-a-local-directory
|
||||
# this saves to the local directory, so I can constantly access the right file and check if it exists or not
|
||||
@@ -143,32 +159,39 @@ class Frontier(object):
|
||||
|
||||
# Will create a file of all the unique links and you can read the file and do lines = f.readlines() then len(lines) to get the number of unique links
|
||||
#Locking and releasing each file
|
||||
file_1_mutex.acquire()
|
||||
self.file_1_mutex.acquire()
|
||||
if (os.path.exists(my_filename)):
|
||||
f = open(my_filename, 'a')
|
||||
f.write(removeFragment(url))
|
||||
f.write(str(removeFragment(url)))
|
||||
f.close()
|
||||
else:
|
||||
f = open(my_filename, 'w')
|
||||
f.write(removeFragment(url))
|
||||
f.write(str(removeFragment(url)))
|
||||
f.close()
|
||||
file_1_mutex.release()
|
||||
self.file_1_mutex.release()
|
||||
|
||||
def q234(self, url, resp):
|
||||
# rakslice (8 May 2013) Stackoverflow. https://stackoverflow.com/questions/16430258/creating-a-python-file-in-a-local-directory
|
||||
# this saves to the local directory, so I can constantly access the right file and check if it exists or not
|
||||
file_2_mutex.acquire()
|
||||
|
||||
if resp.status != 200:
|
||||
return
|
||||
|
||||
self.file_2_mutex.acquire()
|
||||
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
||||
my_filename = os.path.join(path_to_script, "q2.txt")
|
||||
|
||||
tempTok = tokenize(resp)
|
||||
if len(tempTok) > self.max:
|
||||
self.max = len(tempTok)
|
||||
self.longest = url
|
||||
f = open(my_filename, 'w')
|
||||
f.write("Longest Page: {url}, length: {length}".format(url = self.longest, length = self.max))
|
||||
f.close()
|
||||
file_2_mutex.release()
|
||||
|
||||
try:
|
||||
tempTok = tokenize(resp)
|
||||
if len(tempTok) > self.max:
|
||||
self.max = len(tempTok)
|
||||
self.longest = url
|
||||
f = open(my_filename, 'w')
|
||||
f.write("Longest Page: {url}, length: {length}".format(url = self.longest, length = self.max))
|
||||
f.close()
|
||||
except:
|
||||
print("resp dying for some reason ?")
|
||||
self.file_2_mutex.release()
|
||||
|
||||
tempTok = removeStopWords(tempTok)
|
||||
computeFrequencies(tempTok, self.grand_dict)
|
||||
@@ -178,7 +201,7 @@ class Frontier(object):
|
||||
path_to_script = os.path.dirname(os.path.abspath(__file__))
|
||||
my_filename = os.path.join(path_to_script, "q3.txt")
|
||||
|
||||
file_3_mutex.acquire()
|
||||
self.file_3_mutex.acquire()
|
||||
|
||||
f = open(my_filename, "w")
|
||||
sortedGrandDict = {k: v for k, v in sorted(self.grand_dict.items(), key=lambda item: item[1], reverse = True)}
|
||||
@@ -191,7 +214,7 @@ class Frontier(object):
|
||||
i += 1
|
||||
f.close()
|
||||
|
||||
file_3_mutex.release()
|
||||
self.file_3_mutex.release()
|
||||
|
||||
fragless = removeFragment(url)
|
||||
domain = findDomains(fragless.netloc)
|
||||
@@ -208,11 +231,11 @@ class Frontier(object):
|
||||
my_filename = os.path.join(path_to_script, "q4.txt")
|
||||
|
||||
# creating text file for question 4
|
||||
file_4_mutex.acquire()
|
||||
self.file_4_mutex.acquire()
|
||||
sortedDictKeys = sorted(self.ics.keys())
|
||||
f = open(my_filename, "w")
|
||||
for i in sortedDictKeys:
|
||||
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
|
||||
f.close()
|
||||
file_4_mutex.release()
|
||||
self.file_4_mutex.release()
|
||||
|
||||
@@ -51,14 +51,20 @@ class Worker(Thread):
|
||||
toc = time.perf_counter()
|
||||
print(f"Took {toc - tic:0.4f} seconds to do scrape url")
|
||||
|
||||
|
||||
tic = time.perf_counter()
|
||||
self.frontier.acquire_data_mutex()
|
||||
for scraped_url in scraped_urls:
|
||||
self.frontier.add_url(scraped_url)
|
||||
self.frontier.release_data_mutex()
|
||||
toc = time.perf_counter()
|
||||
print(f"Took {toc - tic:0.4f} seconds to do add_url stuffs")
|
||||
|
||||
tic = time.perf_counter()
|
||||
self.frontier.mark_url_complete(tbd_url)
|
||||
toc = time.perf_counter()
|
||||
print(f"Took {toc - tic:0.4f} seconds to do store stuffs")
|
||||
|
||||
while start + self.config.time_delay > time.perf_counter():
|
||||
time.sleep(self.config.time_delay/5)
|
||||
self.frontier.release_polite(tbd_url)
|
||||
|
||||
self.frontier.release_polite(tbd_url)
|
||||
|
||||
Reference in New Issue
Block a user