Spaces:
Runtime error
Runtime error
import os | |
from io import StringIO | |
from tempfile import mkdtemp, mkstemp | |
import pandas as pd | |
import tree_sitter | |
from IPython.display import HTML | |
from metakernel import MetaKernel | |
import subprocess | |
from .codeql import QueryClient | |
__version__ = "0.0.1" | |
class CodeQLKernel(MetaKernel): | |
implementation = "CodeQL Kernel" | |
implementation_version = "1.0" | |
language = "ql" | |
language_version = "0.1" | |
banner = "CodeQL Kernel - Experimental" | |
language_info = { | |
"mimetype": "text/x-codeql", | |
"name": "codeql", | |
"file_extension": ".ql", | |
"help_links": MetaKernel.help_links, | |
} | |
def __init__(self, **kwargs): | |
# get absolute path of running script | |
here = os.path.dirname(os.path.abspath(__file__)) | |
self.QL_LANGUAGE = tree_sitter.Language( | |
os.path.join(here, "tree-sitter-ql.so"), "ql" | |
) | |
self._select_query = self.QL_LANGUAGE.query( | |
"(moduleMember (select)) @select_statement" | |
) | |
self._predicate_query = self.QL_LANGUAGE.query( | |
"""(moduleMember | |
(annotation name: (annotName) @aname (#eq? @aname "query")). | |
(classlessPredicate name: (predicateName) @pname) | |
) @annotated_query """ | |
) | |
self._parser = tree_sitter.Parser() | |
self._parser.set_language(self.QL_LANGUAGE) | |
self._context = "" | |
def on_progress(obj): | |
self.Display(obj["message"], clear_output=True) | |
def on_result(obj): | |
self.Display( | |
f"Query completed in {obj['evaluationTime']}!", clear_output=True | |
) | |
self._query_client: QueryClient = QueryClient( | |
on_progress=on_progress, on_result=on_result | |
) | |
MetaKernel.__init__(self, **kwargs) | |
print(kwargs) | |
def get_usage(self): | |
return "This is the CodeQL kernel." | |
def parse_cell(self, cell): | |
""" | |
parse the cell code using tree-sitter | |
""" | |
tree = self._parser.parse(bytes(cell, "utf8")) | |
select_statements = [] | |
query_predicates = [] | |
captures = self._select_query.captures(tree.root_node) | |
for capture in captures: | |
# capture[0] is the node, capture[1] is the capture name | |
if capture[1] == "select_statement": | |
start_point = capture[0].start_point | |
end_point = capture[0].end_point | |
select_statements.append((start_point, end_point)) | |
captures = self._predicate_query.captures(tree.root_node) | |
for capture in captures: | |
# capture[0] is the node | |
# capture[1] is the capture name | |
if capture[1] == "annotated_query": | |
start_point = capture[0].start_point | |
end_point = capture[0].end_point | |
# extract the annotation name | |
# check if its a query predicate | |
for i, line in enumerate(cell.split("\n")): | |
if i == start_point[0]: | |
if (line[start_point[1]: start_point[1] + len("query")] == "query"): | |
query_predicates.append((start_point, end_point)) | |
return (select_statements, query_predicates) | |
def evaluate(self, code, quick_eval=None): | |
""" | |
Evaluate the given code and return the result. | |
""" | |
try: | |
if not self._query_client._db_metadata: | |
self.Error_display("No database registered! Use %set_database to register a database.") | |
return | |
# create a temporary directory to hold the query pack and the query | |
qlpack = "\n".join( | |
[ | |
"---", | |
"library: false", | |
"name: jupyter-kernel/temporary-qlpack", | |
"version: 0.0.1", | |
"dependencies:", | |
" codeql/{}-all: '*'", | |
"", | |
] | |
).format(self._query_client._db_metadata["languages"][0]) | |
tmp_dir = mkdtemp(dir="/tmp", prefix="codeql_kernel") | |
with open(os.path.join(tmp_dir, "qlpack.yml"), "w") as f: | |
f.write(qlpack) | |
subprocess.run("codeql pack install", cwd=tmp_dir, shell=True) | |
fd, query_path = mkstemp(suffix=".ql", dir=tmp_dir, text=True) | |
os.write(fd, bytearray(code, "utf-8")) | |
os.close(fd) | |
self.Display("Running query ...", clear_output=True) | |
(err, resp) = self._query_client.run_query( | |
query_path, quick_eval=quick_eval | |
) | |
if err: | |
self.clear_output(wait=True) | |
self.Error_display( | |
"Error running query: {}".format(err) | |
) | |
else: | |
csv = StringIO(resp) | |
chunks = (chunk for chunk in pd.read_csv(csv, chunksize=5000)) | |
df = pd.concat(chunks) | |
self.Display(HTML(df.to_html()), clear_output=True) | |
except Exception as e: | |
self.Error_display("Error running query: {}".format(e)) | |
def do_execute_direct(self, code): | |
""" | |
Execute the given code directly. | |
""" | |
(select_statements, query_predicates) = self.parse_cell(code) | |
if len(query_predicates) == 1 and len(select_statements) == 0: | |
# we have exactly one query predicate: | |
# add cell to the context and evaluate the query predicate | |
offset = len(self._context.split("\n")) | |
self._context += code + "\n" | |
predicate = query_predicates[0][0] | |
pred_line = predicate[0] | |
pred_col = predicate[1] | |
cell_lines = code.split("\n") | |
words = cell_lines[pred_line].strip().split(" ") | |
position = { | |
"startLine": offset + pred_line, | |
"endLine": offset + pred_line, | |
"startColumn": pred_col + len(words[0]) + len(words[1]) + 3, | |
"endColumn": pred_col + len(words[0]) + len(words[1]) + 3, | |
} | |
self.Display("Evaluating predicate '" + words[2].split("(")[0] + "'", clear_output=True) | |
self.evaluate(self._context, quick_eval=position) | |
elif len(select_statements) == 1: | |
# we have exactly one select statement: | |
# add cell to the context and evaluate the whole context | |
self._context += code + "\n" | |
self.Display("Evaluating select statement ...", clear_output=True) | |
self.evaluate(self._context) | |
else: | |
self._context += code + "\n" | |
def repr(self, data): | |
return repr(data) | |
def do_shutdown(self, restart): | |
if self._query_client: | |
self._query_client.stop() | |
if restart: | |
self.Print("Restarting kernel...") | |
self.reload_magics() | |
self.restart_kernel() | |
self.Print("Done!") | |
super(CodeQLKernel, self).do_shutdown(restart) | |
if __name__ == "__main__": | |
CodeQLKernel.run_as_main() | |