Whoosh/Search
From charlesreid1
Contents
Whoosh Search
About the Search class
The Search class in Whoosh defines key behavior like how to iterate over all documents (and how to access documents in the first place).
If a custom document type needs to be defined (e.g., a Google Drive spreadsheet), the Search class is where we define how to iterate over all the spreadsheets, process the contents, and add the document to the search index.
The Search class makes high-level calls to the Index class (like "add a new document to the search index, with the following values for each field").
Search class vs Index class
As mentioned above, the Whoosh/Index class provides a high-level interface, which is called by the Search class.
Files
The Search class is defined
Examples
whoosh Search class for folder of Markdown files
Example Search class for creating an index of a folder full of markdown files: https://git.charlesreid1.com/charlesreid1/markdown-search/src/branch/master/search_md.py
class Search: ix = None index_folder = None markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False) html_parser = html.parser.HTMLParser() schema = None def __init__(self, index_folder): self.open_index(index_folder) def open_index(self, index_folder, create_new=False): self.index_folder = index_folder if create_new: if os.path.exists(index_folder): shutil.rmtree(index_folder) print("deleted index folder: " + index_folder) if not os.path.exists(index_folder): os.mkdir(index_folder) exists = index.exists_in(index_folder) stemming_analyzer = StemmingAnalyzer() schema = Schema( path=ID(stored=True, unique=True) , filename=TEXT(stored=True, field_boost=100.0) , headlines=KEYWORD(stored=True, scorable=True, field_boost=60.0) , content=TEXT(stored=True, analyzer=stemming_analyzer) , time=STORED ) if not exists: self.ix = index.create_in(index_folder, schema) else: self.ix = index.open_dir(index_folder) def add_document(self, writer, file_path, config): file_name = file_path.replace(".", " ").replace("/", " ").replace("\\", " ").replace("_", " ").replace("-", " ") # read file content with codecs.open(file_path, 'r', encoding='utf-8') as f: content = f.read() path = file_path # parse markdown fields parser = MarkdownParser() parser.parse(content, config) modtime = os.path.getmtime(path) print("adding to index: path: %s size:%d headlines:'%s' modtime=%d" % ( path, len(content), parser.headlines, modtime)) writer.add_document( path=path , filename=file_name , headlines=parser.headlines , content=content , time = modtime ) def add_all_files(self, file_dir, config, create_new_index=False): if create_new_index: self.open_index(self.index_folder, create_new=True) count = 0 writer = self.ix.writer() for root, dirs, files in os.walk(file_dir, followlinks=True): for ff in files: if ff.endswith(".md") or ff.endswith(".markdown"): path = os.path.join(root, ff) self.add_document(writer, path, config) count += 1 writer.commit() print("Done, added %d documents to the index" % count) def update_index_incremental(self, config, create_new_index=False): file_dir = config["MARKDOWN_FILES_DIR"] if create_new_index: self.open_index(self.index_folder, create_new=True) all_files = [] for root, dirs, files in os.walk(file_dir, followlinks=True): if not root.endswith(".git"): for ff in files: if ff.endswith(".md") or ff.endswith(".markdown"): path = os.path.join(root, ff) print('updating file %s'%(path)) all_files.append(path) # see: https://pythonhosted.org/Whoosh/indexing.html#incremental-indexing # The set of all paths in the index indexed_paths = set() # The set of all paths we need to re-index to_index = set() count = 0 with self.ix.searcher() as searcher: writer = self.ix.writer() # Loop over the stored fields in the index for fields in searcher.all_stored_fields(): indexed_path = fields['path'] indexed_paths.add(indexed_path) if not os.path.exists(indexed_path): # This file was deleted since it was indexed writer.delete_by_term('path', indexed_path) print("removed from index: %s" % indexed_path) else: # Check if this file was changed since it # was indexed indexed_time = fields['time'] mtime = os.path.getmtime(indexed_path) if mtime > indexed_time: # The file has changed, delete it and add it to the list of # files to reindex writer.delete_by_term('path', indexed_path) to_index.add(indexed_path) # Loop over the files in the filesystem for path in all_files: if path in to_index or path not in indexed_paths: # This is either a file that's changed, or a new file # that wasn't indexed before. So index it! self.add_document(writer, path, config) count += 1 writer.commit() print("Done, updated %d documents in the index" % count) def create_search_result(self, results): # Allow larger fragments results.fragmenter.maxchars = 300 # Show more context before and after results.fragmenter.surround = 50 search_results = [] for r in results: sr = SearchResult() #import pdb; pdb.set_trace() #print(dir(r)) sr.score = r.score sr.path = r["path"] sr.content = r["content"] highlights = r.highlights("content") if not highlights: highlights = self.cap(r["content"], 1000) # unescape highlights = self.html_parser.unescape(highlights) html = self.markdown(highlights) sr.content_highlight = html if "headlines" in r: sr.headlines = r["headlines"] search_results.append(sr) return search_results def cap(self, s, l): return s if len(s) <= l else s[0:l - 3] + '...' def search(self, query_list, fields=None): with self.ix.searcher() as searcher: query_string = " ".join(query_list) query = None if "\"" in query_string or ":" in query_string: query = QueryParser("content", self.schema).parse(query_string) elif len(fields) == 1 and fields[0] == "filename": pass elif len(fields) == 2: pass else: fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"] if not query: query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string) parsed_query = "%s" % query print("query: %s" % parsed_query) results = searcher.search(query, terms=False, scored=True, groupedby="path") search_result = self.create_search_result(results) return parsed_query, search_result def get_document_total_count(self): return self.ix.searcher().doc_count_all()
whoosh Search class for Github issues
Example Search class for creating an index of Github issues: https://git.charlesreid1.com/charlesreid1/issues-search/src/branch/master/issues_search.py
(The trick here is to re-use the Markdown class above, and turn each Github issue + comments thread into a Markdown file.)
class Search: ix = None index_folder = None markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False) html_parser = html.parser.HTMLParser() schema = None def __init__(self, index_folder): self.open_index(index_folder) def open_index(self, index_folder, create_new=False): self.index_folder = index_folder if create_new: if os.path.exists(index_folder): shutil.rmtree(index_folder) print("deleted index folder: " + index_folder) if not os.path.exists(index_folder): os.mkdir(index_folder) exists = index.exists_in(index_folder) stemming_analyzer = StemmingAnalyzer() schema = Schema( url=ID(stored=True, unique=True), is_comment=BOOLEAN(stored=True), timestamp=STORED, repo_name=TEXT(stored=True), repo_url=ID(stored=True), issue_title=TEXT(stored=True, field_boost=100.0), issue_url=ID(stored=True), user=TEXT(stored=True), content=TEXT(stored=True, analyzer=stemming_analyzer) ) if not exists: self.ix = index.create_in(index_folder, schema) else: self.ix = index.open_dir(index_folder) def add_issue(self, writer, issue, repo, config): """ Add Github issue to search index. Replaces add_document This must: - deal with original issue content - iterate over each comment - deal with comment content Schema: - url - is_comment - timestamp - repo_name - repo_url - issue_title - issue_url - user - content """ # should store urls of all issues and comments repo_name = repo.name repo_url = repo.html_url count = 0 # Handle the issue content print("Indexing issue %s"%(issue.html_url)) writer.add_document( url = issue.html_url, is_comment = False, timestamp = issue.created_at, repo_name = repo_name, repo_url = repo_url, issue_title = issue.title, issue_url = issue.html_url, user = issue.user.login, content = issue.body.rstrip() ) count += 1 # Handle the comments content if(issue.comments>0): comments = issue.get_comments() for comment in comments: print(" > Indexing comment %s"%(comment.html_url)) writer.add_document( url = comment.html_url, is_comment = True, timestamp = comment.created_at, repo_name = repo_name, repo_url = repo_url, issue_title = issue.title, issue_url = issue.html_url, user = comment.user.login, content = comment.body.strip() ) count += 1 return count def add_all_issues(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False): """ Add all issues in a given github repo to the search index. Replaces add_all_files Takes as inputs: - github access token - list of github repos - github org/user owning these repos - location of the whoosh config file for configuring the search engine """ if create_new_index: self.open_index(self.index_folder, create_new=True) writer = self.ix.writer() # ------------ # Iterate over each repo # Iterate over each thread (github issue) # Iterate over each comment g = Github(gh_access_token) org = g.get_organization(which_org) c = 0 # Iterate over each repo for this_repo in list_of_repos: repo = org.get_repo(this_repo) reponame = repo.name # Iterate over each thread issues = repo.get_issues() for issue in issues: # Deal with original issue content # AND iterate over each comment c += self.add_issue(writer, issue, repo, config) # should store urls of all issues and comments writer.commit() print("Done, added %d documents to the index" % c) def update_index_incremental(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False): """ Update the index of issues of a given github repo. Takes as inputs: - github access token - list of github repos - github org/user owning these repos - location of the whoosh config file for configuring the search engine """ if create_new_index: self.open_index(self.index_folder, create_new=True) # Using URL as the unique identifier # Start by getting a list of URLs that are indexed # Then walk all URLs g = Github(gh_access_token) org = g.get_organization(which_org) # Set of all URLs as existing on github to_index = set() writer = self.ix.writer() # fix this. the delete all in index # is not occurring in right place. # Iterate over each repo for this_repo in list_of_repos: repo = org.get_repo(this_repo) reponame = repo.name count = 0 # Iterate over each thread issues = repo.get_issues() for issue in issues: # This approach is more work than is needed # but PoC||GTFO # For each issue/comment URL, # remove the corresponding item # and re-add it to the index to_index.add(issue.html_url) writer.delete_by_term('url', issue.html_url) comments = issue.get_comments() for comment in comments: to_index.add(comment.html_url) writer.delete_by_term('url', comment.html_url) # Now re-add this issue to the index count += self.add_issue(writer, issue, repo, config) writer.commit() print("Done, updated %d documents in the index" % count) def create_search_result(self, results): # Allow larger fragments results.fragmenter.maxchars = 300 # Show more context before and after results.fragmenter.surround = 50 search_results = [] for r in results: # Note: this is where we package things up # for the Jinja template "search.html". # For example, the Jinja template # contains a {% for e in entries %} # and then an {{e.score}} sr = SearchResult() sr.score = r.score sr.url = r['url'] sr.title = r['issue_title'] sr.repo_name = r['repo_name'] sr.repo_url = r['repo_url'] sr.issue_title = r['issue_title'] sr.issue_url = r['issue_url'] sr.is_comment = r['is_comment'] sr.content = r['content'] highlights = r.highlights('content') if not highlights: # just use the first 1,000 words of the document highlights = self.cap(r['content'], 1000) highlights = self.html_parser.unescape(highlights) html = self.markdown(highlights) sr.content_highlight = html search_results.append(sr) return search_results def cap(self, s, l): return s if len(s) <= l else s[0:l - 3] + '...' def search(self, query_list, fields=None): with self.ix.searcher() as searcher: query_string = " ".join(query_list) query = None if "\"" in query_string or ":" in query_string: query = QueryParser("content", self.schema).parse(query_string) elif len(fields) == 1 and fields[0] == "filename": pass elif len(fields) == 2: pass else: fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"] if not query: query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string) parsed_query = "%s" % query print("query: %s" % parsed_query) results = searcher.search(query, terms=False, scored=True, groupedby="url") search_result = self.create_search_result(results) return parsed_query, search_result def get_document_total_count(self): return self.ix.searcher().doc_count_all()
Flags
whoosh - a python search engine notes on whoosh
Whoosh · Whoosh/How It Works · Whoosh/Search · Whoosh/Index · Whoosh/Storage
|