From charlesreid1

Whoosh Search

About the Search class

The Search class in Whoosh defines key behavior like how to iterate over all documents (and how to access documents in the first place).

If a custom document type needs to be defined (e.g., a Google Drive spreadsheet), the Search class is where we define how to iterate over all the spreadsheets, process the contents, and add the document to the search index.

The Search class makes high-level calls to the Index class (like "add a new document to the search index, with the following values for each field").

Search class vs Index class

As mentioned above, the Whoosh/Index class provides a high-level interface, which is called by the Search class.

Files

The Search class is defined

Examples

whoosh Search class for folder of Markdown files

Example Search class for creating an index of a folder full of markdown files: https://git.charlesreid1.com/charlesreid1/markdown-search/src/branch/master/search_md.py

class Search:
    ix = None
    index_folder = None
    markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False)
    html_parser = html.parser.HTMLParser()
    schema = None

    def __init__(self, index_folder):
        self.open_index(index_folder)

    def open_index(self, index_folder, create_new=False):
        self.index_folder = index_folder
        if create_new:
            if os.path.exists(index_folder):
                shutil.rmtree(index_folder)
                print("deleted index folder: " + index_folder)

        if not os.path.exists(index_folder):
            os.mkdir(index_folder)

        exists = index.exists_in(index_folder)
        stemming_analyzer = StemmingAnalyzer()

        schema = Schema(
            path=ID(stored=True, unique=True)
            , filename=TEXT(stored=True, field_boost=100.0)
            , headlines=KEYWORD(stored=True, scorable=True, field_boost=60.0)
            , content=TEXT(stored=True, analyzer=stemming_analyzer)
            , time=STORED
        )
        if not exists:
            self.ix = index.create_in(index_folder, schema)
        else:
            self.ix = index.open_dir(index_folder)

    def add_document(self, writer, file_path, config):
        file_name = file_path.replace(".", " ").replace("/", " ").replace("\\", " ").replace("_", " ").replace("-", " ")
        # read file content
        with codecs.open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            path = file_path

        # parse markdown fields
        parser = MarkdownParser()
        parser.parse(content, config)

        modtime = os.path.getmtime(path)
        print("adding to index: path: %s size:%d headlines:'%s' modtime=%d" % (
            path, len(content), parser.headlines, modtime))
        writer.add_document(
            path=path
            , filename=file_name
            , headlines=parser.headlines
            , content=content
            , time = modtime
        )


    def add_all_files(self, file_dir, config, create_new_index=False):
        if create_new_index:
            self.open_index(self.index_folder, create_new=True)

        count = 0
        writer = self.ix.writer()
        for root, dirs, files in os.walk(file_dir, followlinks=True):
            for ff in files:
                if ff.endswith(".md") or ff.endswith(".markdown"):
                    path = os.path.join(root, ff)
                    self.add_document(writer, path, config)
                    count += 1
        writer.commit()
        print("Done, added %d documents to the index" % count)

    def update_index_incremental(self, config, create_new_index=False):
        file_dir = config["MARKDOWN_FILES_DIR"]
        if create_new_index:
            self.open_index(self.index_folder, create_new=True)

        all_files = []
        for root, dirs, files in os.walk(file_dir, followlinks=True):
            if not root.endswith(".git"):
                for ff in files:
                    if ff.endswith(".md") or ff.endswith(".markdown"):
                        path = os.path.join(root, ff)
                        print('updating file %s'%(path))
                        all_files.append(path)

        # see: https://pythonhosted.org/Whoosh/indexing.html#incremental-indexing
        # The set of all paths in the index
        indexed_paths = set()
        # The set of all paths we need to re-index
        to_index = set()

        count = 0
        with self.ix.searcher() as searcher:
            writer = self.ix.writer()

            # Loop over the stored fields in the index
            for fields in searcher.all_stored_fields():
                indexed_path = fields['path']
                indexed_paths.add(indexed_path)

                if not os.path.exists(indexed_path):
                    # This file was deleted since it was indexed
                    writer.delete_by_term('path', indexed_path)
                    print("removed from index: %s" % indexed_path)

                else:
                    # Check if this file was changed since it
                    # was indexed
                    indexed_time = fields['time']
                    mtime = os.path.getmtime(indexed_path)
                    if mtime > indexed_time:
                        # The file has changed, delete it and add it to the list of
                        # files to reindex
                        writer.delete_by_term('path', indexed_path)
                        to_index.add(indexed_path)

            # Loop over the files in the filesystem
            for path in all_files:
                if path in to_index or path not in indexed_paths:
                    # This is either a file that's changed, or a new file
                    # that wasn't indexed before. So index it!
                    self.add_document(writer, path, config)
                    count += 1

            writer.commit()

            print("Done, updated %d documents in the index" % count)

    def create_search_result(self, results):
        # Allow larger fragments
        results.fragmenter.maxchars = 300

        # Show more context before and after
        results.fragmenter.surround = 50

        search_results = []
        for r in results:
            sr = SearchResult()
            #import pdb; pdb.set_trace()
            #print(dir(r))
            sr.score = r.score
            sr.path = r["path"]
            sr.content = r["content"]
            highlights = r.highlights("content")
            if not highlights:
                highlights = self.cap(r["content"], 1000)
            # unescape
            highlights = self.html_parser.unescape(highlights)
            html = self.markdown(highlights)
            sr.content_highlight = html
            if "headlines" in r:
                sr.headlines = r["headlines"]
            search_results.append(sr)

        return search_results

    def cap(self, s, l):
        return s if len(s) <= l else s[0:l - 3] + '...'

    def search(self, query_list, fields=None):
        with self.ix.searcher() as searcher:
            query_string = " ".join(query_list)
            query = None
            if "\"" in query_string or ":" in query_string:
                query = QueryParser("content", self.schema).parse(query_string)
            elif len(fields) == 1 and fields[0] == "filename":
                pass
            elif len(fields) == 2:
                pass
            else:
                fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"]
            if not query:
                query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string)
            parsed_query = "%s" % query
            print("query: %s" % parsed_query)
            results = searcher.search(query, terms=False, scored=True, groupedby="path")
            search_result = self.create_search_result(results)

        return parsed_query, search_result

    def get_document_total_count(self):
        return self.ix.searcher().doc_count_all()


whoosh Search class for Github issues

Example Search class for creating an index of Github issues: https://git.charlesreid1.com/charlesreid1/issues-search/src/branch/master/issues_search.py

(The trick here is to re-use the Markdown class above, and turn each Github issue + comments thread into a Markdown file.)

class Search:
    ix = None
    index_folder = None
    markdown = mistune.Markdown(renderer=DontEscapeHtmlInCodeRenderer(), escape=False)
    html_parser = html.parser.HTMLParser()
    schema = None

    def __init__(self, index_folder):
        self.open_index(index_folder)

    def open_index(self, index_folder, create_new=False):
        self.index_folder = index_folder
        if create_new:
            if os.path.exists(index_folder):
                shutil.rmtree(index_folder)
                print("deleted index folder: " + index_folder)

        if not os.path.exists(index_folder):
            os.mkdir(index_folder)

        exists = index.exists_in(index_folder)
        stemming_analyzer = StemmingAnalyzer()

        schema = Schema(
                url=ID(stored=True, unique=True),
                is_comment=BOOLEAN(stored=True),
                timestamp=STORED,
                repo_name=TEXT(stored=True),
                repo_url=ID(stored=True),
                issue_title=TEXT(stored=True, field_boost=100.0),
                issue_url=ID(stored=True),
                user=TEXT(stored=True),
                content=TEXT(stored=True, analyzer=stemming_analyzer)
        )

        if not exists:
            self.ix = index.create_in(index_folder, schema)
        else:
            self.ix = index.open_dir(index_folder)


    def add_issue(self, writer, issue, repo, config):
        """
        Add Github issue to search index.

        Replaces add_document

        This must:
        - deal with original issue content
        - iterate over each comment
        - deal with comment content

        Schema:
        - url
        - is_comment
        - timestamp
        - repo_name
        - repo_url
        - issue_title
        - issue_url
        - user
        - content
        """

        # should store urls of all issues and comments

        repo_name = repo.name
        repo_url = repo.html_url

        count = 0


        # Handle the issue content
        print("Indexing issue %s"%(issue.html_url))
        writer.add_document(
                url = issue.html_url,
                is_comment = False,
                timestamp = issue.created_at,
                repo_name = repo_name,
                repo_url = repo_url,
                issue_title = issue.title,
                issue_url = issue.html_url,
                user = issue.user.login,
                content = issue.body.rstrip()
        )
        count += 1


        # Handle the comments content
        if(issue.comments>0):
            comments = issue.get_comments()
            for comment in comments:
                print(" > Indexing comment %s"%(comment.html_url))
                writer.add_document(
                        url = comment.html_url,
                        is_comment = True,
                        timestamp = comment.created_at,
                        repo_name = repo_name,
                        repo_url = repo_url,
                        issue_title = issue.title,
                        issue_url = issue.html_url,
                        user = comment.user.login,
                        content = comment.body.strip()
                )

        count += 1
        return count


    def add_all_issues(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False):
        """
        Add all issues in a given github repo to the search index.

        Replaces add_all_files

        Takes as inputs:
        - github access token
        - list of github repos
        - github org/user owning these repos
        - location of the whoosh config file for configuring the search engine
        """

        if create_new_index:
            self.open_index(self.index_folder, create_new=True)

        writer = self.ix.writer()

        # ------------
        # Iterate over each repo
        # Iterate over each thread (github issue)
        # Iterate over each comment 

        g = Github(gh_access_token)
        org = g.get_organization(which_org)

        c = 0 

        # Iterate over each repo
        for this_repo in list_of_repos:

            repo = org.get_repo(this_repo)
            reponame = repo.name

            # Iterate over each thread
            issues = repo.get_issues()
            for issue in issues:

                # Deal with original issue content
                # AND iterate over each comment 
                c += self.add_issue(writer, issue, repo, config)

                # should store urls of all issues and comments

        writer.commit()
        print("Done, added %d documents to the index" % c)


    def update_index_incremental(self, gh_access_token, list_of_repos, which_org, config, create_new_index=False):
        """
        Update the index of issues of a given github repo.

        Takes as inputs:
        - github access token
        - list of github repos
        - github org/user owning these repos
        - location of the whoosh config file for configuring the search engine
        """

        if create_new_index:
            self.open_index(self.index_folder, create_new=True)

        # Using URL as the unique identifier
        # Start by getting a list of URLs that are indexed
        # Then walk all URLs


        g = Github(gh_access_token)
        org = g.get_organization(which_org)

        # Set of all URLs as existing on github
        to_index = set()


        writer = self.ix.writer()




        # fix this. the delete all in index
        # is not occurring in right place.


        # Iterate over each repo
        for this_repo in list_of_repos:

            repo = org.get_repo(this_repo)
            reponame = repo.name

            count = 0

            # Iterate over each thread
            issues = repo.get_issues()
            for issue in issues:

                # This approach is more work than is needed
                # but PoC||GTFO

                # For each issue/comment URL,
                # remove the corresponding item
                # and re-add it to the index

                to_index.add(issue.html_url)
                writer.delete_by_term('url', issue.html_url)
                comments = issue.get_comments()

                for comment in comments:
                    to_index.add(comment.html_url)
                    writer.delete_by_term('url', comment.html_url)

                # Now re-add this issue to the index
                count += self.add_issue(writer, issue, repo, config)


        writer.commit()
        print("Done, updated %d documents in the index" % count)


    def create_search_result(self, results):
        # Allow larger fragments
        results.fragmenter.maxchars = 300

        # Show more context before and after
        results.fragmenter.surround = 50

        search_results = []
        for r in results:

            # Note: this is where we package things up 
            # for the Jinja template "search.html".
            # For example, the Jinja template
            # contains a {% for e in entries %}
            # and then an {{e.score}}

            sr = SearchResult()
            sr.score = r.score
            sr.url = r['url']
            sr.title = r['issue_title']

            sr.repo_name = r['repo_name']
            sr.repo_url = r['repo_url']

            sr.issue_title = r['issue_title']
            sr.issue_url = r['issue_url']

            sr.is_comment = r['is_comment']

            sr.content = r['content']
            highlights = r.highlights('content')
            if not highlights:
                # just use the first 1,000 words of the document
                highlights = self.cap(r['content'], 1000)

            highlights = self.html_parser.unescape(highlights)
            html = self.markdown(highlights)
            sr.content_highlight = html
            search_results.append(sr)

        return search_results

    def cap(self, s, l):
        return s if len(s) <= l else s[0:l - 3] + '...'

    def search(self, query_list, fields=None):
        with self.ix.searcher() as searcher:
            query_string = " ".join(query_list)
            query = None
            if "\"" in query_string or ":" in query_string:
                query = QueryParser("content", self.schema).parse(query_string)
            elif len(fields) == 1 and fields[0] == "filename":
                pass
            elif len(fields) == 2:
                pass
            else:
                fields = ["headlines", "content", "filename", "doubleemphasiswords", "emphasiswords"]
            if not query:
                query = MultifieldParser(fields, schema=self.ix.schema).parse(query_string)
            parsed_query = "%s" % query
            print("query: %s" % parsed_query)
            results = searcher.search(query, terms=False, scored=True, groupedby="url")
            search_result = self.create_search_result(results)

        return parsed_query, search_result

    def get_document_total_count(self):
        return self.ix.searcher().doc_count_all()

Flags