Fixed a lot of racing issues, there potentially could be a writer reader confusion type of thing, but it should not matter that much, as long as server is healthy we can let this bad boi lose

This commit is contained in:
Hieuhuy Pham
2022-04-23 02:13:12 -07:00
parent 90a5d16456
commit 74063e5d00
5 changed files with 1090 additions and 40 deletions

View File

@@ -4,6 +4,8 @@ import shelve
from threading import Thread, Lock,Semaphore
from queue import Queue, Empty
import time
from utils import get_logger, get_urlhash, normalize
from scraper import is_valid
from datacollection import *
@@ -27,6 +29,7 @@ class Frontier(object):
self.balance_index = 0
#Semaphore for each domain to keep each domain noice and tidy with politeness
self.domain_semaphores = [Lock(),Lock(),Lock(),Lock(),Lock()]
#Local data lock
@@ -37,6 +40,7 @@ class Frontier(object):
self.file_2_mutex = Lock()
self.file_3_mutex = Lock()
self.file_4_mutex = Lock()
# data collection is going to happen in the frontier
# uniques encompass overall unique links
@@ -88,8 +92,19 @@ class Frontier(object):
###CRITICAL SECTION
self.data_mutex.acquire()
try:
hold = self.to_be_downloaded[balance_index].pop()
balance_index = balance_index + 1
initial = self.balance_index
print("Initial " + str(initial))
self.balance_index = self.balance_index + 1
if self.balance_index > 4:
self.balance_index = 0
while not self.to_be_downloaded[self.balance_index]:
self.balance_index = self.balance_index + 1
if self.balance_index > 4:
self.balance_index = 0
if self.balance_index == initial:
self.data_mutex.release()
return None
hold = self.to_be_downloaded[self.balance_index].pop()
self.data_mutex.release()
return hold
except IndexError:
@@ -111,14 +126,12 @@ class Frontier(object):
urlhash = get_urlhash(url)
##CRITICAL SECTION
self.data_mutex.acquire()
if urlhash not in self.save:
# This should not happen.
self.logger.error(
f"Completed url {url}, but have not seen it before.")
self.save[urlhash] = (url, True)
self.save.sync()
self.data_mutex.release()
##CRITICAL SECTION
@@ -134,7 +147,8 @@ class Frontier(object):
elif "today.uci.edu/department/information_computer_sciences/" in url:
return 4
else:
println("ERROR")
print(url)
print("ERROR")
@@ -149,6 +163,12 @@ class Frontier(object):
def release_data_mutex(self):
return self.data_mutex.release()
def acquire_234_mutex(self):
return self.file_2_3_4_mutex.acquire()
def release_234_mutex(self):
return self.file_2_3_4_mutex.release()
def q1(self, url):
@@ -162,11 +182,11 @@ class Frontier(object):
self.file_1_mutex.acquire()
if (os.path.exists(my_filename)):
f = open(my_filename, 'a')
f.write(str(removeFragment(url)))
f.write(str(removeFragment(url)) + "\n")
f.close()
else:
f = open(my_filename, 'w')
f.write(str(removeFragment(url)))
f.write(str(removeFragment(url)) + "\n")
f.close()
self.file_1_mutex.release()
@@ -177,12 +197,13 @@ class Frontier(object):
if resp.status != 200:
return
self.file_2_mutex.acquire()
tic = time.perf_counter()
path_to_script = os.path.dirname(os.path.abspath(__file__))
my_filename = os.path.join(path_to_script, "q2.txt")
try:
tempTok = tokenize(resp)
self.file_2_mutex.acquire()
if len(tempTok) > self.max:
self.max = len(tempTok)
self.longest = url
@@ -191,17 +212,21 @@ class Frontier(object):
f.close()
except:
print("resp dying for some reason ?")
self.file_2_mutex.release()
tempTok = removeStopWords(tempTok)
computeFrequencies(tempTok, self.grand_dict)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 2 !")
tic = time.perf_counter()
tempTok = removeStopWords(tempTok)
self.file_3_mutex.acquire()
computeFrequencies(tempTok, self.grand_dict)
# rakslice (8 May 2013) Stackoverflow. https://stackoverflow.com/questions/16430258/creating-a-python-file-in-a-local-directory
# this saves to the local directory, so I can constantly access the right file and check if it exists or not
path_to_script = os.path.dirname(os.path.abspath(__file__))
my_filename = os.path.join(path_to_script, "q3.txt")
self.file_3_mutex.acquire()
f = open(my_filename, "w")
sortedGrandDict = {k: v for k, v in sorted(self.grand_dict.items(), key=lambda item: item[1], reverse = True)}
@@ -213,11 +238,16 @@ class Frontier(object):
f.write("{}: {}\n".format(k, v))
i += 1
f.close()
self.file_3_mutex.release()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 3 !")
tic = time.perf_counter()
fragless = removeFragment(url)
domain = findDomains(fragless.netloc)
self.file_4_mutex.acquire()
if domain[1] == 'ics':
if domain[0] not in self.ics:
self.ics[domain[0]] = urlData(url, domain[0], domain[1])
@@ -231,11 +261,13 @@ class Frontier(object):
my_filename = os.path.join(path_to_script, "q4.txt")
# creating text file for question 4
self.file_4_mutex.acquire()
sortedDictKeys = sorted(self.ics.keys())
f = open(my_filename, "w")
for i in sortedDictKeys:
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
f.close()
self.file_4_mutex.release()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 4 !")

View File

@@ -32,6 +32,24 @@ class Worker(Thread):
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do download url")
self.logger.info(
f"Downloaded {tbd_url}, status <{resp.status}>, "
f"using cache {self.config.cache_server}.")
tic = time.perf_counter()
scraped_urls = scraper.scraper(tbd_url, resp)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do scrape url")
tic = time.perf_counter()
print(self.frontier.acquire_data_mutex())
for scraped_url in scraped_urls:
self.frontier.add_url(scraped_url)
self.frontier.mark_url_complete(tbd_url)
self.frontier.release_data_mutex()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do add_url stuffs")
tic = time.perf_counter()
self.frontier.q1(tbd_url)
toc = time.perf_counter()
@@ -41,30 +59,12 @@ class Worker(Thread):
self.frontier.q234(tbd_url, resp)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do log q234 url")
self.logger.info(
f"Downloaded {tbd_url}, status <{resp.status}>, "
f"using cache {self.config.cache_server}.")
tic = time.perf_counter()
scraped_urls = scraper.scraper(tbd_url, resp)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do scrape url")
tic = time.perf_counter()
self.frontier.acquire_data_mutex()
for scraped_url in scraped_urls:
self.frontier.add_url(scraped_url)
self.frontier.release_data_mutex()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do add_url stuffs")
tic = time.perf_counter()
self.frontier.mark_url_complete(tbd_url)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do store stuffs")
while start + self.config.time_delay > time.perf_counter():
print("Sleeping")
time.sleep(self.config.time_delay/5)
self.frontier.release_polite(tbd_url)