File size: 7,111 Bytes
5958f7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import os
from io import StringIO
from tempfile import mkdtemp, mkstemp

import pandas as pd
import tree_sitter
from IPython.display import HTML
from metakernel import MetaKernel
import subprocess

from .codeql import QueryClient

__version__ = "0.0.1"


class CodeQLKernel(MetaKernel):
    implementation = "CodeQL Kernel"
    implementation_version = "1.0"
    language = "ql"
    language_version = "0.1"
    banner = "CodeQL Kernel - Experimental"
    language_info = {
        "mimetype": "text/x-codeql",
        "name": "codeql",
        "file_extension": ".ql",
        "help_links": MetaKernel.help_links,
    }

    def __init__(self, **kwargs):
        # get absolute path of running script
        here = os.path.dirname(os.path.abspath(__file__))
        self.QL_LANGUAGE = tree_sitter.Language(
            os.path.join(here, "tree-sitter-ql.so"), "ql"
        )
        self._select_query = self.QL_LANGUAGE.query(
            "(moduleMember (select)) @select_statement"
        )
        self._predicate_query = self.QL_LANGUAGE.query(
            """(moduleMember
                (annotation name: (annotName) @aname (#eq? @aname "query")).
                (classlessPredicate name: (predicateName) @pname)
               ) @annotated_query """
        )
        self._parser = tree_sitter.Parser()
        self._parser.set_language(self.QL_LANGUAGE)
        self._context = ""

        def on_progress(obj):
            self.Display(obj["message"], clear_output=True)

        def on_result(obj):
            self.Display(
                f"Query completed in {obj['evaluationTime']}!", clear_output=True
            )

        self._query_client: QueryClient = QueryClient(
            on_progress=on_progress, on_result=on_result
        )
        MetaKernel.__init__(self, **kwargs)
        print(kwargs)

    def get_usage(self):
        return "This is the CodeQL kernel."

    def parse_cell(self, cell):
        """
        parse the cell code using tree-sitter

        """
        tree = self._parser.parse(bytes(cell, "utf8"))
        select_statements = []
        query_predicates = []
        captures = self._select_query.captures(tree.root_node)
        for capture in captures:
            # capture[0] is the node, capture[1] is the capture name
            if capture[1] == "select_statement":
                start_point = capture[0].start_point
                end_point = capture[0].end_point
                select_statements.append((start_point, end_point))

        captures = self._predicate_query.captures(tree.root_node)
        for capture in captures:
            # capture[0] is the node
            # capture[1] is the capture name
            if capture[1] == "annotated_query":
                start_point = capture[0].start_point
                end_point = capture[0].end_point
                # extract the annotation name
                # check if its a query predicate
                for i, line in enumerate(cell.split("\n")):
                    if i == start_point[0]:
                        if (line[start_point[1]: start_point[1] + len("query")] == "query"):
                            query_predicates.append((start_point, end_point))
        return (select_statements, query_predicates)

    def evaluate(self, code, quick_eval=None):
        """
        Evaluate the given code and return the result.
        """
        try:
            if not self._query_client._db_metadata:
                self.Error_display("No database registered! Use %set_database to register a database.")
                return

            # create a temporary directory to hold the query pack and the query
            qlpack = "\n".join(
                [
                    "---",
                    "library: false",
                    "name: jupyter-kernel/temporary-qlpack",
                    "version: 0.0.1",
                    "dependencies:",
                    "  codeql/{}-all: '*'",
                    "",
                ]
            ).format(self._query_client._db_metadata["languages"][0])
            tmp_dir = mkdtemp(dir="/tmp", prefix="codeql_kernel")
            with open(os.path.join(tmp_dir, "qlpack.yml"), "w") as f:
                f.write(qlpack)
            subprocess.run("codeql pack install", cwd=tmp_dir, shell=True)
            fd, query_path = mkstemp(suffix=".ql", dir=tmp_dir, text=True)
            os.write(fd, bytearray(code, "utf-8"))
            os.close(fd)
            self.Display("Running query ...", clear_output=True)
            (err, resp) = self._query_client.run_query(
                query_path, quick_eval=quick_eval
            )
            if err:
                self.clear_output(wait=True)
                self.Error_display(
                    "Error running query: {}".format(err)
                )
            else:
                csv = StringIO(resp)
                chunks = (chunk for chunk in pd.read_csv(csv, chunksize=5000))
                df = pd.concat(chunks)
                self.Display(HTML(df.to_html()), clear_output=True)

        except Exception as e:
            self.Error_display("Error running query: {}".format(e))

    def do_execute_direct(self, code):
        """
        Execute the given code directly.
        """
        (select_statements, query_predicates) = self.parse_cell(code)
        if len(query_predicates) == 1 and len(select_statements) == 0:
            # we have exactly one query predicate:
            # add cell to the context and evaluate the query predicate
            offset = len(self._context.split("\n"))
            self._context += code + "\n"
            predicate = query_predicates[0][0]
            pred_line = predicate[0]
            pred_col = predicate[1]
            cell_lines = code.split("\n")
            words = cell_lines[pred_line].strip().split(" ")
            position = {
                "startLine": offset + pred_line,
                "endLine": offset + pred_line,
                "startColumn": pred_col + len(words[0]) + len(words[1]) + 3,
                "endColumn": pred_col + len(words[0]) + len(words[1]) + 3,
            }
            self.Display("Evaluating predicate '" + words[2].split("(")[0] + "'", clear_output=True)
            self.evaluate(self._context, quick_eval=position)
        elif len(select_statements) == 1:
            # we have exactly one select statement:
            # add cell to the context and evaluate the whole context
            self._context += code + "\n"
            self.Display("Evaluating select statement ...", clear_output=True)
            self.evaluate(self._context)
        else:
            self._context += code + "\n"

    def repr(self, data):
        return repr(data)

    def do_shutdown(self, restart):
        if self._query_client:
            self._query_client.stop()
        if restart:
            self.Print("Restarting kernel...")
            self.reload_magics()
            self.restart_kernel()
            self.Print("Done!")
        super(CodeQLKernel, self).do_shutdown(restart)


if __name__ == "__main__":
    CodeQLKernel.run_as_main()