khulnasoft's picture
Upload 12 files
5958f7e verified
raw
history blame
7.11 kB
import os
from io import StringIO
from tempfile import mkdtemp, mkstemp
import pandas as pd
import tree_sitter
from IPython.display import HTML
from metakernel import MetaKernel
import subprocess
from .codeql import QueryClient
__version__ = "0.0.1"
class CodeQLKernel(MetaKernel):
implementation = "CodeQL Kernel"
implementation_version = "1.0"
language = "ql"
language_version = "0.1"
banner = "CodeQL Kernel - Experimental"
language_info = {
"mimetype": "text/x-codeql",
"name": "codeql",
"file_extension": ".ql",
"help_links": MetaKernel.help_links,
}
def __init__(self, **kwargs):
# get absolute path of running script
here = os.path.dirname(os.path.abspath(__file__))
self.QL_LANGUAGE = tree_sitter.Language(
os.path.join(here, "tree-sitter-ql.so"), "ql"
)
self._select_query = self.QL_LANGUAGE.query(
"(moduleMember (select)) @select_statement"
)
self._predicate_query = self.QL_LANGUAGE.query(
"""(moduleMember
(annotation name: (annotName) @aname (#eq? @aname "query")).
(classlessPredicate name: (predicateName) @pname)
) @annotated_query """
)
self._parser = tree_sitter.Parser()
self._parser.set_language(self.QL_LANGUAGE)
self._context = ""
def on_progress(obj):
self.Display(obj["message"], clear_output=True)
def on_result(obj):
self.Display(
f"Query completed in {obj['evaluationTime']}!", clear_output=True
)
self._query_client: QueryClient = QueryClient(
on_progress=on_progress, on_result=on_result
)
MetaKernel.__init__(self, **kwargs)
print(kwargs)
def get_usage(self):
return "This is the CodeQL kernel."
def parse_cell(self, cell):
"""
parse the cell code using tree-sitter
"""
tree = self._parser.parse(bytes(cell, "utf8"))
select_statements = []
query_predicates = []
captures = self._select_query.captures(tree.root_node)
for capture in captures:
# capture[0] is the node, capture[1] is the capture name
if capture[1] == "select_statement":
start_point = capture[0].start_point
end_point = capture[0].end_point
select_statements.append((start_point, end_point))
captures = self._predicate_query.captures(tree.root_node)
for capture in captures:
# capture[0] is the node
# capture[1] is the capture name
if capture[1] == "annotated_query":
start_point = capture[0].start_point
end_point = capture[0].end_point
# extract the annotation name
# check if its a query predicate
for i, line in enumerate(cell.split("\n")):
if i == start_point[0]:
if (line[start_point[1]: start_point[1] + len("query")] == "query"):
query_predicates.append((start_point, end_point))
return (select_statements, query_predicates)
def evaluate(self, code, quick_eval=None):
"""
Evaluate the given code and return the result.
"""
try:
if not self._query_client._db_metadata:
self.Error_display("No database registered! Use %set_database to register a database.")
return
# create a temporary directory to hold the query pack and the query
qlpack = "\n".join(
[
"---",
"library: false",
"name: jupyter-kernel/temporary-qlpack",
"version: 0.0.1",
"dependencies:",
" codeql/{}-all: '*'",
"",
]
).format(self._query_client._db_metadata["languages"][0])
tmp_dir = mkdtemp(dir="/tmp", prefix="codeql_kernel")
with open(os.path.join(tmp_dir, "qlpack.yml"), "w") as f:
f.write(qlpack)
subprocess.run("codeql pack install", cwd=tmp_dir, shell=True)
fd, query_path = mkstemp(suffix=".ql", dir=tmp_dir, text=True)
os.write(fd, bytearray(code, "utf-8"))
os.close(fd)
self.Display("Running query ...", clear_output=True)
(err, resp) = self._query_client.run_query(
query_path, quick_eval=quick_eval
)
if err:
self.clear_output(wait=True)
self.Error_display(
"Error running query: {}".format(err)
)
else:
csv = StringIO(resp)
chunks = (chunk for chunk in pd.read_csv(csv, chunksize=5000))
df = pd.concat(chunks)
self.Display(HTML(df.to_html()), clear_output=True)
except Exception as e:
self.Error_display("Error running query: {}".format(e))
def do_execute_direct(self, code):
"""
Execute the given code directly.
"""
(select_statements, query_predicates) = self.parse_cell(code)
if len(query_predicates) == 1 and len(select_statements) == 0:
# we have exactly one query predicate:
# add cell to the context and evaluate the query predicate
offset = len(self._context.split("\n"))
self._context += code + "\n"
predicate = query_predicates[0][0]
pred_line = predicate[0]
pred_col = predicate[1]
cell_lines = code.split("\n")
words = cell_lines[pred_line].strip().split(" ")
position = {
"startLine": offset + pred_line,
"endLine": offset + pred_line,
"startColumn": pred_col + len(words[0]) + len(words[1]) + 3,
"endColumn": pred_col + len(words[0]) + len(words[1]) + 3,
}
self.Display("Evaluating predicate '" + words[2].split("(")[0] + "'", clear_output=True)
self.evaluate(self._context, quick_eval=position)
elif len(select_statements) == 1:
# we have exactly one select statement:
# add cell to the context and evaluate the whole context
self._context += code + "\n"
self.Display("Evaluating select statement ...", clear_output=True)
self.evaluate(self._context)
else:
self._context += code + "\n"
def repr(self, data):
return repr(data)
def do_shutdown(self, restart):
if self._query_client:
self._query_client.stop()
if restart:
self.Print("Restarting kernel...")
self.reload_magics()
self.restart_kernel()
self.Print("Done!")
super(CodeQLKernel, self).do_shutdown(restart)
if __name__ == "__main__":
CodeQLKernel.run_as_main()