Whoosh/Search
From charlesreid1
Contents
Whoosh Search
About the Search class
The Search class in Whoosh defines key behavior like how to iterate over all documents (and how to access documents in the first place).
If a custom document type needs to be defined (e.g., a Google Drive spreadsheet), the Search class is where we define how to iterate over all the spreadsheets, process the contents, and add the document to the search index.
The Search class makes high-level calls to the Index class (like "add a new document to the search index, with the following values for each field").
Search class vs Index class
As mentioned above, the Whoosh/Index class provides a high-level interface, which is called by the Search class.
Files
The Search class is defined
Examples
whoosh Search class for folder of Markdown files
Example Search class for creating an index of a folder full of markdown files: https://git.charlesreid1.com/charlesreid1/markdown-search/src/branch/master/search_md.py
class Search:
ix = None
index_folder = None
markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False)
html_parser = html.parser.HTMLParser()
schema = None
def __init__(self, index_folder):
self.open_index(index_folder)
def open_index(self, index_folder, create_new=False):
self.index_folder = index_folder
if create_new:
if os.path.exists(index_folder):
shutil.rmtree(index_folder)
print("deleted index folder: " + index_folder)
if not os.path.exists(index_folder):
os.mkdir(index_folder)
exists = index.exists_in(index_folder)
stemming_analyzer = StemmingAnalyzer()
schema = Schema(
path=ID(stored=True, unique=True)
, filename=TEXT(stored=True, field_boost=100.0)
, headlines=KEYWORD(stored=True, scorable=True, field_boost=60.0)
, content=TEXT(stored=True, analyzer=stemming_analyzer)
, time=STORED
)
if not exists:
self.ix = index.create_in(index_folder, schema)
else:
self.ix = index.open_dir(index_folder)
def add_document(self, writer, file_path, config):
file_name = file_path.replace(".", " ").replace("/", " ").replace("\\", " ").replace("_", " ").replace("-", " ")
# read file content
with codecs.open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
path = file_path
# parse markdown fields
parser = MarkdownParser()
parser.parse(content, config)
modtime = os.path.getmtime(path)
print("adding to index: path: %s size:%d headlines:'%s' modtime=%d" % (
path, len(content), parser.headlines, modtime))
writer.add_document(
path=path
, filename=file_name
, headlines=parser.headlines
, content=content
, time = modtime
)
def add_all_files(self, file_dir, config, create_new_index=False):
if create_new_index:
self.open_index(self.index_folder, create_new=True)
count = 0
writer = self.ix.writer()
for root, dirs, files in os.walk(file_dir, followlinks=True):
for ff in files:
if ff.endswith(".md") or ff.endswith(".markdown"):
path = os.path.join(root, ff)
self.add_document(writer, path, config)
count += 1
writer.commit()
print("Done, added %d documents to the index" % count)
def update_index_incremental(self, config, create_new_index=False):
file_dir = config["MARKDOWN_FILES_DIR"]
if create_new_index:
self.open_index(self.index_folder, create_new=True)
all_files = []
for root, dirs, files in os.walk(file_dir, followlinks=True):
if not root.endswith(".git"):
for ff in files:
if ff.endswith(".md") or ff.endswith(".markdown"):
path = os.path.join(root, ff)
print('updating file %s'%(path))
all_files.append(path)
# see: https://pythonhosted.org/Whoosh/indexing.html#incremental-indexing
# The set of all paths in the index
indexed_paths = set()
# The set of all paths we need to re-index
to_index = set()
count = 0
with self.ix.searcher() as searcher:
writer = self.ix.writer()
# Loop over the stored fields in the index
for fields in searcher.all_stored_fields():
indexed_path = fields['path']
indexed_paths.add(indexed_path)
if not os.path.exists(indexed_path):
# This file was deleted since it was indexed
writer.delete_by_term('path', indexed_path)
print("removed from index: %s" % indexed_path)
else:
# Check if this file was changed since it
# was indexed
indexed_time = fields['time']
mtime = os.path.getmtime(indexed_path)
if mtime > indexed_time:
# The file has changed, delete it and add it to the list of
# files to reindex
writer.delete_by_term('path', indexed_path)
to_index.add(indexed_path)
# Loop over the files in the filesystem
for path in all_files:
if path in to_index or path not in indexed_paths:
# This is either a file that's changed, or a new file
# that wasn't indexed before. So index it!
self.add_document(writer, path, config)
count += 1
writer.commit()
print("Done, updated %d documents in the index" % count)
def create_search_result(self, results):
# Allow larger fragments
results.fragmenter.maxchars = 300
# Show more context before and after
results.fragmenter.surround = 50
search_results = []
for r in results:
sr = SearchResult()
#import pdb; pdb.set_trace()
#print(dir(r))
sr.score = r.score
sr.path = r["path"]
sr.content = r["content"]
highlights = r.highlights("content")
if not highlights:
highlights = self.cap(r["content"], 1000)
# unescape
highlights = self.html_parser.unescape(highlights)
html = self.markdown(highlights)
sr.content_highlight = html
if "headlines" in r:
sr.headlines = r["headlines"]
search_results.append(sr)
return search_results
def cap(self, s, l):
return s if len(s) <= l else s[0:l - 3] + '...'
def search(self, query_list, fields=None):
with self.ix.searcher() as searcher:
query_string = " ".join(query_list)
query = None
if "\"" in query_string or ":" in query_string:
query = QueryParser("content", self.schema).parse(query_string)
elif len(fields) == 1 and fields[0] == "filename":
pass
elif len(fields) == 2:
pass
else:
fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"]
if not query:
query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string)
parsed_query = "%s" % query
print("query: %s" % parsed_query)
results = searcher.search(query, terms=False, scored=True, groupedby="path")
search_result = self.create_search_result(results)
return parsed_query, search_result
def get_document_total_count(self):
return self.ix.searcher().doc_count_all()
whoosh Search class for Github issues
Example Search class for creating an index of Github issues: https://git.charlesreid1.com/charlesreid1/issues-search/src/branch/master/issues_search.py
(The trick here is to re-use the Markdown class above, and turn each Github issue + comments thread into a Markdown file.)
class Search:
ix = None
index_folder = None
markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False)
html_parser = html.parser.HTMLParser()
schema = None
def __init__(self, index_folder):
self.open_index(index_folder)
def open_index(self, index_folder, create_new=False):
self.index_folder = index_folder
if create_new:
if os.path.exists(index_folder):
shutil.rmtree(index_folder)
print("deleted index folder: " + index_folder)
if not os.path.exists(index_folder):
os.mkdir(index_folder)
exists = index.exists_in(index_folder)
stemming_analyzer = StemmingAnalyzer()
schema = Schema(
url=ID(stored=True, unique=True),
is_comment=BOOLEAN(stored=True),
timestamp=STORED,
repo_name=TEXT(stored=True),
repo_url=ID(stored=True),
issue_title=TEXT(stored=True, field_boost=100.0),
issue_url=ID(stored=True),
user=TEXT(stored=True),
content=TEXT(stored=True, analyzer=stemming_analyzer)
)
if not exists:
self.ix = index.create_in(index_folder, schema)
else:
self.ix = index.open_dir(index_folder)
def add_issue(self, writer, issue, repo, config):
"""
Add Github issue to search index.
Replaces add_document
This must:
- deal with original issue content
- iterate over each comment
- deal with comment content
Schema:
- url
- is_comment
- timestamp
- repo_name
- repo_url
- issue_title
- issue_url
- user
- content
"""
# should store urls of all issues and comments
repo_name = repo.name
repo_url = repo.html_url
count = 0
# Handle the issue content
print("Indexing issue %s"%(issue.html_url))
writer.add_document(
url = issue.html_url,
is_comment = False,
timestamp = issue.created_at,
repo_name = repo_name,
repo_url = repo_url,
issue_title = issue.title,
issue_url = issue.html_url,
user = issue.user.login,
content = issue.body.rstrip()
)
count += 1
# Handle the comments content
if(issue.comments>0):
comments = issue.get_comments()
for comment in comments:
print(" > Indexing comment %s"%(comment.html_url))
writer.add_document(
url = comment.html_url,
is_comment = True,
timestamp = comment.created_at,
repo_name = repo_name,
repo_url = repo_url,
issue_title = issue.title,
issue_url = issue.html_url,
user = comment.user.login,
content = comment.body.strip()
)
count += 1
return count
def add_all_issues(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False):
"""
Add all issues in a given github repo to the search index.
Replaces add_all_files
Takes as inputs:
- github access token
- list of github repos
- github org/user owning these repos
- location of the whoosh config file for configuring the search engine
"""
if create_new_index:
self.open_index(self.index_folder, create_new=True)
writer = self.ix.writer()
# ------------
# Iterate over each repo
# Iterate over each thread (github issue)
# Iterate over each comment
g = Github(gh_access_token)
org = g.get_organization(which_org)
c = 0
# Iterate over each repo
for this_repo in list_of_repos:
repo = org.get_repo(this_repo)
reponame = repo.name
# Iterate over each thread
issues = repo.get_issues()
for issue in issues:
# Deal with original issue content
# AND iterate over each comment
c += self.add_issue(writer, issue, repo, config)
# should store urls of all issues and comments
writer.commit()
print("Done, added %d documents to the index" % c)
def update_index_incremental(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False):
"""
Update the index of issues of a given github repo.
Takes as inputs:
- github access token
- list of github repos
- github org/user owning these repos
- location of the whoosh config file for configuring the search engine
"""
if create_new_index:
self.open_index(self.index_folder, create_new=True)
# Using URL as the unique identifier
# Start by getting a list of URLs that are indexed
# Then walk all URLs
g = Github(gh_access_token)
org = g.get_organization(which_org)
# Set of all URLs as existing on github
to_index = set()
writer = self.ix.writer()
# fix this. the delete all in index
# is not occurring in right place.
# Iterate over each repo
for this_repo in list_of_repos:
repo = org.get_repo(this_repo)
reponame = repo.name
count = 0
# Iterate over each thread
issues = repo.get_issues()
for issue in issues:
# This approach is more work than is needed
# but PoC||GTFO
# For each issue/comment URL,
# remove the corresponding item
# and re-add it to the index
to_index.add(issue.html_url)
writer.delete_by_term('url', issue.html_url)
comments = issue.get_comments()
for comment in comments:
to_index.add(comment.html_url)
writer.delete_by_term('url', comment.html_url)
# Now re-add this issue to the index
count += self.add_issue(writer, issue, repo, config)
writer.commit()
print("Done, updated %d documents in the index" % count)
def create_search_result(self, results):
# Allow larger fragments
results.fragmenter.maxchars = 300
# Show more context before and after
results.fragmenter.surround = 50
search_results = []
for r in results:
# Note: this is where we package things up
# for the Jinja template "search.html".
# For example, the Jinja template
# contains a {% for e in entries %}
# and then an {{e.score}}
sr = SearchResult()
sr.score = r.score
sr.url = r['url']
sr.title = r['issue_title']
sr.repo_name = r['repo_name']
sr.repo_url = r['repo_url']
sr.issue_title = r['issue_title']
sr.issue_url = r['issue_url']
sr.is_comment = r['is_comment']
sr.content = r['content']
highlights = r.highlights('content')
if not highlights:
# just use the first 1,000 words of the document
highlights = self.cap(r['content'], 1000)
highlights = self.html_parser.unescape(highlights)
html = self.markdown(highlights)
sr.content_highlight = html
search_results.append(sr)
return search_results
def cap(self, s, l):
return s if len(s) <= l else s[0:l - 3] + '...'
def search(self, query_list, fields=None):
with self.ix.searcher() as searcher:
query_string = " ".join(query_list)
query = None
if "\"" in query_string or ":" in query_string:
query = QueryParser("content", self.schema).parse(query_string)
elif len(fields) == 1 and fields[0] == "filename":
pass
elif len(fields) == 2:
pass
else:
fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"]
if not query:
query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string)
parsed_query = "%s" % query
print("query: %s" % parsed_query)
results = searcher.search(query, terms=False, scored=True, groupedby="url")
search_result = self.create_search_result(results)
return parsed_query, search_result
def get_document_total_count(self):
return self.ix.searcher().doc_count_all()
Flags
| whoosh - a python search engine notes on whoosh
Whoosh · Whoosh/How It Works · Whoosh/Search · Whoosh/Index · Whoosh/Storage
|