Locks are not racing anymore, locks work multi-thread works, change some storing information stuff so its more readble, add some new regex but it will need to be trim later because it does not do its job

This commit is contained in:
Hieuhuy Pham
2022-04-23 18:49:24 -07:00
parent 74063e5d00
commit c1b7a50460
6 changed files with 18254 additions and 35 deletions

View File

@@ -25,7 +25,7 @@ class Frontier(object):
self.config = config
#Load balancer, list()
self.to_be_downloaded = [list(),list(),list(),list(),list()]
self.to_be_downloaded = [set(),set(),set(),set(),set()]
self.balance_index = 0
@@ -82,7 +82,7 @@ class Frontier(object):
tbd_count = 0
for url, completed in self.save.values():
if not completed and is_valid(url):
self.to_be_downloaded[self.get_domain_index(url)].append(url)
self.to_be_downloaded[self.get_domain_index(url)].add(url)
tbd_count += 1
self.logger.info(
f"Found {tbd_count} urls to be downloaded from {total_count} "
@@ -92,22 +92,24 @@ class Frontier(object):
###CRITICAL SECTION
self.data_mutex.acquire()
try:
initial = self.balance_index
print("Initial " + str(initial))
self.balance_index = self.balance_index + 1
if self.balance_index > 4:
self.balance_index = 0
while not self.to_be_downloaded[self.balance_index]:
#Load balancing
loop = 10
while not self.to_be_downloaded[self.balance_index] and loop != 0:
self.balance_index = self.balance_index + 1
if self.balance_index > 4:
self.balance_index = 0
if self.balance_index == initial:
loop = loop - 1
if loop == 0:
self.data_mutex.release()
return None
hold = self.to_be_downloaded[self.balance_index].pop()
self.balance_index = self.balance_index + 1
self.data_mutex.release()
#print(hold)
return hold
except IndexError:
print("POPPING RANDOM SHIT BRO")
self.data_mutex.release()
return None
@@ -118,7 +120,7 @@ class Frontier(object):
if urlhash not in self.save:
self.save[urlhash] = (url, False)
self.save.sync()
self.to_be_downloaded[self.get_domain_index(url)].append(url)
self.to_be_downloaded[self.get_domain_index(url)].add(url)
###CRITICAL SECTION
@@ -136,11 +138,12 @@ class Frontier(object):
def get_domain_index(self,url):
if "ics.uci.edu" in url:
#yeah if you put ics.uci.edu in first it will add all informatics link into that instead
if "informatics.uci.edu" in url:
return 0
elif "cs.uci.edu" in url:
elif "ics.uci.edu" in url:
return 1
elif "informatics.uci.edu" in url:
elif "cs.uci.edu" in url:
return 2
elif "stat.uci.edu" in url:
return 3
@@ -216,7 +219,7 @@ class Frontier(object):
self.file_2_mutex.release()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 2 !")
#print(f"Took {toc - tic:0.4f} seconds to save file 2 !")
tic = time.perf_counter()
tempTok = removeStopWords(tempTok)
@@ -241,7 +244,7 @@ class Frontier(object):
self.file_3_mutex.release()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 3 !")
#print(f"Took {toc - tic:0.4f} seconds to save file 3 !")
tic = time.perf_counter()
@@ -264,10 +267,10 @@ class Frontier(object):
sortedDictKeys = sorted(self.ics.keys())
f = open(my_filename, "w")
for i in sortedDictKeys:
f.write("{url}, {num}".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
f.write("{url}, {num} + \n".format(url = self.ics[i].getNiceLink(), num = len(self.ics[i].getUniques())))
f.close()
self.file_4_mutex.release()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to save file 4 !")
#print(f"Took {toc - tic:0.4f} seconds to save file 4 !")

View File

@@ -18,19 +18,19 @@ class Worker(Thread):
def run(self):
while True:
start = time.perf_counter()
tic = time.perf_counter()
tbd_url = self.frontier.get_tbd_url()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to get_tbd_url")
#print(f"Took {toc - tic:0.4f} seconds to get_tbd_url")
if not tbd_url:
self.logger.info("Frontier is empty. Stopping Crawler.")
break
self.frontier.acquire_polite(tbd_url)
tic = time.perf_counter()
resp = download(tbd_url, self.config, self.logger)
start = time.perf_counter()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do download url")
#print(f"Took {toc - tic:0.4f} seconds to do download url")
self.logger.info(
f"Downloaded {tbd_url}, status <{resp.status}>, "
@@ -39,32 +39,32 @@ class Worker(Thread):
tic = time.perf_counter()
scraped_urls = scraper.scraper(tbd_url, resp)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do scrape url")
#print(f"Took {toc - tic:0.4f} seconds to do scrape url")
tic = time.perf_counter()
print(self.frontier.acquire_data_mutex())
self.frontier.acquire_data_mutex()
for scraped_url in scraped_urls:
self.frontier.add_url(scraped_url)
self.frontier.mark_url_complete(tbd_url)
self.frontier.release_data_mutex()
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do add_url stuffs")
#print(f"Took {toc - tic:0.4f} seconds to do add_url stuffs")
tic = time.perf_counter()
self.frontier.q1(tbd_url)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do log q1 url")
#print(f"Took {toc - tic:0.4f} seconds to do log q1 url")
tic = time.perf_counter()
self.frontier.q234(tbd_url, resp)
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to do log q234 url")
#print(f"Took {toc - tic:0.4f} seconds to do log q234 url")
while start + self.config.time_delay > time.perf_counter():
print("Sleeping")
#print("Sleeping")
time.sleep(self.config.time_delay/5)
self.frontier.release_polite(tbd_url)