Same as previous push

This commit is contained in:
unknown
2022-05-27 13:12:15 -07:00
parent 60f6eb0df0
commit e325b9d810
27 changed files with 434 additions and 325 deletions

View File

@@ -10,7 +10,6 @@
#Posting ---> Source of file, tf-idf score. #for now we will only use these two, as we get more complex posting will be change accordingly
#Data input
import math
import json
import os
import shelve
@@ -18,7 +17,8 @@ from bs4 import BeautifulSoup
from time import perf_counter
import time
import threading
import pickle
from threading import Lock
import math
#Data process
@@ -34,235 +34,196 @@ import re
from posting import Posting
from worker import Worker
class Node():
index_value = ''
postings = list()
class Index():
length = 0
index = list()
class Indexer():
def __init__(self,restart,trimming):
def __init__(self,list_partials,weight,data_paths,worker_factory=Worker):
#Config stuffs
self.path = "D:/Visual Studio Workspace/CS121/assignment3/data/DEV/"
self.restart = restart
self.trimming = trimming
self.path = "data/DEV"
self.num_doc = 0
self.list_partials = list_partials
self.weight = weight
self.data_paths = data_paths
self.stemmer = PorterStemmer()
self.id = list()
# list that contains the denominator for normalization before taking the square root of it. square root will be taken during query time
self.normalize = list()
self.data_paths_lock = Lock()
self.list_partials_lock = Lock()
#Shelves for index
#https://www3.nd.edu/~busiforc/handouts/cryptography/letterfrequencies.html
#https://www.irishtimes.com/news/science/how-many-numbers-begin-with-a-1-more-than-30-per-cent-1.4162466
#According to this will be how we split things
#Save #1 = ABCD + (1) ~ 18.3% of words
#Save #2 = EFGHIJK + (2-3)~ 27.1% of words
#Save #3 = LMNOPQ + (4-7) ~ 25.4% of words
#Save #4 = RSTUVWXYZ + (8-9)~ 29.2% of words
#Save #5 = Special characters
if os.path.exists("save_1.shelve") and restart:
os.remove("save_1.shelve")
if os.path.exists("save_2.shelve") and restart:
os.remove("save_2.shelve")
if os.path.exists("save_3.shelve") and restart:
os.remove("save_3.shelve")
if os.path.exists("save_4.shelve") and restart:
os.remove("save_4.shelve")
if os.path.exists("save_5.shelve") and restart:
os.remove("save_5.shelve")
self.workers = list()
self.worker_factory = worker_factory
self.save_1 = shelve.open("save_1.shelve")
self.save_1_lock = threading.Lock()
self.save_2 = shelve.open("save_2.shelve")
self.save_2_lock = threading.Lock()
self.save_3 = shelve.open("save_3.shelve")
self.save_3_lock = threading.Lock()
self.save_4 = shelve.open("save_4.shelve")
self.save_4_lock = threading.Lock()
self.save_5 = shelve.open("save_5.shelve")
self.save_5_lock = threading.Lock()
def start_async(self):
self.workers = [
self.worker_factory(worker_id,self)
for worker_id in range(8)]
for worker in self.workers:
worker.start()
print(len(list(self.save_1.keys())))
print(len(list(self.save_2.keys())))
print(len(list(self.save_3.keys())))
print(len(list(self.save_4.keys())))
print(len(list(self.save_5.keys())))
def start(self):
self.start_async()
self.join()
def get_url_id(self, url):
return self.id.index(url)
def save_index(self,word,posting):
cur_save = self.get_save_file(word)
lock = self.get_save_lock(word)
lock.acquire()
shelve_list = list()
try:
shelve_list = cur_save[word]
shelve_list.append(posting)
tic = perf_counter()
# Sort by url id to help with query search
shelve_list.sort(key=lambda x: x.url)
# shelve_list.sort(key=lambda x: x.tf_idf, reverse = True)
toc = perf_counter()
if toc - tic > 1 :
print("Took " + str(toc - tic) + "seconds to sort shelve list !")
cur_save.sync()
lock.release()
except:
shelve_list.append(posting)
cur_save[word] = shelve_list
cur_save.sync()
lock.release()
def get_save_file(self,word):
#return the correct save depending on the starting letter of word
word_lower = word.lower()
if re.match(r"^[a-d0-1].*",word_lower):
return self.save_1
elif re.match(r"^[e-k2-3].*",word_lower):
return self.save_2
elif re.match(r"^[l-q4-7].*",word_lower):
return self.save_3
elif re.match(r"^[r-z8-9].*",word_lower):
return self.save_4
else:
print(word)
print("You have somehow went beyond the magic")
return self.save_5
def get_save_lock(self,word):
word_lower = word.lower()
if re.match(r"^[a-d0-1].*",word_lower):
return self.save_1_lock
elif re.match(r"^[e-k2-3].*",word_lower):
return self.save_2_lock
elif re.match(r"^[l-q4-7].*",word_lower):
return self.save_3_lock
elif re.match(r"^[r-z8-9].*",word_lower):
return self.save_4_lock
else:
print(word)
print("You have somehow went beyond the magic")
return self.save_5_lock.acquire()
# I have a test file (mytest.py) with pandas but couldn't figure out how to grab just a single cell.
# so I came up with this, if anyone knows how to get a single cell and can explain it to
# me I would love to know, as I think that method might be quicker, maybe, idk it like
# 4am
# https://stackoverflow.com/questions/34449127/sklearn-tfidf-transformer-how-to-get-tf-idf-values-of-given-words-in-documen
# removed parameter "word" since it wasn't used
# TODO: Add important words scaling
def get_tf_idf(self, words):
# words = [whole text] one element list
# return the score
try:
tfidf = TfidfVectorizer(ngram_range=(1,1)) # ngram_range is range of n-values for different n-grams to be extracted (1,3) gets unigrams, bigrams, trigrams
tfidf_matrix = tfidf.fit_transform(words) # fit trains the model, transform creates matrix
df = pd.DataFrame(tfidf_matrix.toarray(), columns = tfidf.get_feature_names_out()) # store value of matrix to associated word/n-gram
#return(df.iloc[0][''.join(word)]) #used for finding single word in dataset
data = df.to_dict() # transform dataframe to dict *could be expensive the larger the data gets, tested on ~1000 word doc and took 0.002 secs to run
return data # returns the dict of words/n-grams with tf-idf
#print(df) # debugging
except:
print("Error in tf_idf!")
return -1
def tf(self, text, url):
# tf
tokens = {}
split = text.split(" ")
# loop using index to keep track of position
for i in range(len(split)):
if split[i] not in tokens:
tokens[split[i]] = Posting(self.get_url_id(url), 1, i)
else:
tokens[split[i]].rtf += 1
tokens[split[i]].tf = (1 + math.log(tokens[split[i]].rtf))
tokens[split[i]].positions.append(i)
return tokens
# Does the idf part of the tfidf
def tfidf(self, current_save):
for token, postings in current_save.items():
for p in postings:
p.tfidf = p.tf * math.log(len(self.id)/len(postings))
self.normalize[p.url] += p.tfidf**2
def join(self):
for worker in self.workers:
worker.join()
def get_data(self):
def get_postings(self,index):
merged_index_index = open("merged_index.index" ,'r')
merged_index = open("merged_index.full",'r')
merged_index_index.seek(0,0)
json_value = merged_index_index.readline()
data = json.loads(json_value)
index_index = dict(data['index'])
to_seek = index_index[index]
merged_index.seek(to_seek,0)
json_value = merged_index.readline()
data = json.loads(json_value)
return data['postings']
num_threads = 8
threads = list()
def set_weight(self):
weight_file = open('docs.weight','w')
jsonStr =json.dumps(self.weight, default=lambda o: o.__dict__,sort_keys=False)
weight_file.write(jsonStr)
weight_file.close()
def get_weight(self,doc_id):
weight = open('docs.weight','r')
weight.seek(0,0)
json_value = weight.readline()
data = json.loads(json_value)
return data[doc_id]
def get_data_path(self):
for directory in os.listdir(self.path):
for file in os.listdir(self.path + "/" + directory + "/"):
#Actual files here
#JSON["url"] = url of crawled page, ignore fragments
#JSON["content"] = actual HTML
#JSON["encoding"] = ENCODING
index = 0
while True:
file_path = self.path + "" + directory + "/"+file
# Add url to id here so that there isn't any problems when worker is multi-threaded
self.data_paths.append("data/DEV/" + directory + "/"+file)
self.num_doc = len(self.data_paths)
tic = perf_counter()
load = open(file_path)
data = json.load(load)
if data["url"] not in self.id:
self.id.append(data["url"])
toc = perf_counter()
print("Took " + str(toc - tic) + " seconds to save url to self.id")
if len(threads) < num_threads:
thread = Worker(self,file_path)
threads.append(thread)
thread.start()
break
else:
if not threads[index].is_alive():
threads[index] = Worker(self,file_path)
threads[index].start()
break
else:
index = index + 1
if(index >= num_threads):
index = 0
time.sleep(.1)
# Make a list the size of the corpus to keep track of document scores
self.normalize = [0] * len(self.id)
def get_next_file(self):
self.data_paths_lock.acquire()
try:
holder = self.data_paths.pop()
self.data_paths_lock.release()
return holder
except IndexError:
self.data_paths_lock.release()
return None
def add_partial_index(self,partial_index):
self.list_partials_lock.acquire()
self.list_partials.append(partial_index)
self.list_partials_lock.release()
# These last few function calls calculates idf and finalizes tf-idf weighting for each index
self.tfidf(self.save_1)
self.tfidf(self.save_2)
self.tfidf(self.save_3)
self.tfidf(self.save_4)
self.tfidf(self.save_5)
# Creates a pickle file that is a list of urls where the index of the url is the id that the posting refers to.
p = os.path.dirname(os.path.abspath(__file__))
my_filename = os.path.join(p, "urlID.pkl")
if os.path.exists(my_filename):
os.remove(my_filename)
# Creates file and closes it
f = open(my_filename, "wb")
pickle.dump(self.id, f)
f.close()
# Creates a pickle file that will contain the denominator (before the square root) for normalizing wt
p = os.path.dirname(os.path.abspath(__file__))
my_filename = os.path.join(p, "normalize.pkl")
if os.path.exists(my_filename):
os.remove(my_filename)
# Creates file and closes it
f = open(my_filename, "wb")
pickle.dump(self.normalize, f)
f.close()
#Found 55770 documents
#
#getting important tokens
#getting important tokens
def merge(self):
partial_files = list()
partial_index_files = list()
parital_index_indices = list()
num_indices = len(self.list_partials)
#Full Index.Index and Length
full_index = Index()
full_index.index = list()
full_index.length = 0
for partial_index in self.list_partials:
file = open("temp/" + partial_index+'.partial','r')
partial_files.append(file)
index = open("temp/" + partial_index+'.index','r')
partial_index_files.append(index)
for partial_index_file in partial_index_files:
partial_index_file.seek(0,0)
parital_index_indices.append(json.loads(partial_index_file.readline()))
#Start all indexes at 0
for partial_file in partial_files:
partial_file.seek(0,0)
pointers = [0]*num_indices
merged_index = open("merged_index.full",'w')
merged_index_index = open("merged_index.index" ,'w')
while(True):
#Get all values from all indices to find min
value = None
values = list()
for i in range(num_indices):
if pointers[i] < parital_index_indices[i]['length']:
values.append(parital_index_indices[i]['index'][pointers[i]][0])
if(len(values) == 0):
break
value = min(values)
#Get data from the min value of all indices if exists then save to mergedIndex
if value == None:
print("I have crashed some how by not getting min value")
break
node = Node()
node.index_value = value
for i in range(num_indices):
if pointers[i] < parital_index_indices[i]['length'] and parital_index_indices[i]['index'][pointers[i]][0] == value:
to_seek = parital_index_indices[i]['index'][pointers[i]][1]
partial_files[i].seek(to_seek,0)
json_value = partial_files[i].readline()
temp_node = json.loads(json_value)
node.postings = node.postings + temp_node['postings']
pointers[i] = pointers[i] + 1
#Change postings here with tf*idf idf = log (n/df(t))
node.postings.sort(key=lambda y:y['doc_id'])
for posting in node.postings:
posting['tf_idf'] = posting['tf_raw']*math.log(self.num_doc/len(node.postings))
full_index.index.append((value,merged_index.tell()))
full_index.length = full_index.length + 1
jsonStr = json.dumps(node,default=lambda o: o.__dict__,sort_keys=False)
merged_index.write(jsonStr + '\n')
full_index.index.sort(key=lambda y:y[0])
jsonStr =json.dumps(full_index, default=lambda o: o.__dict__,sort_keys=False)
merged_index_index.write(jsonStr)
for partial_index in self.list_partials:
os.remove("temp/" + partial_index+'.partial')
os.remove("temp/" + partial_index+'.index')
merged_index_index.close()
merged_index.close()
def main():
indexer = Indexer(True,0)
indexer.get_data()
indexer = Indexer(list(),dict(),list())
indexer.get_data_path()
print("We have " + str(len(indexer.data_paths)) + " documents to go through !" )
indexer.start()
indexer.merge()
print("Finished merging into 1 big happy family")
indexer.set_weight()
tic = time.perf_counter()
indexer.get_postings('artifici')
toc = time.perf_counter()
print(f"Took {toc - tic:0.4f} seconds to get postings for artifici")
tic = time.perf_counter()
indexer.get_weight('00ba3af6a00b7cfb4928e5d234342c5dc46b4e31714d4a8f315a2dd4d8e49860')
print(f"Took {toc - tic:0.4f} seconds to get weight for some random page ")
toc = time.perf_counter()
if __name__ == "__main__":
main()