import os from threading import Thread from typing import Iterator import gradio as gr import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer MAX_MAX_NEW_TOKENS = 1024 DEFAULT_MAX_NEW_TOKENS = 512 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) DESCRIPTION = """\ # Chat with Patched Mixture of Experts (MoE) Model """ LICENSE = """\ --- This space is powered by the patched-mix-4x7B model which was created by [patched](https://patched.codes). """ if not torch.cuda.is_available(): DESCRIPTION += "\n

Running on CPU 🥶 This demo does not work on CPU.

" if torch.cuda.is_available(): model_id = "patched-codes/patched-mix-4x7B" model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_4bit=True) tokenizer = AutoTokenizer.from_pretrained(model_id) tokenizer.use_default_system_prompt = False @spaces.GPU(duration=120) def generate( message: str, chat_history: list[tuple[str, str]], system_prompt: str, max_new_tokens: int = 1024, temperature: float = 0.2, top_p: float = 0.95, # top_k: int = 50, # repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, #top_k=top_k, temperature=temperature, #num_beams=1, repetition_penalty=1.2, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) example1='''You are a senior software engineer who is best in the world at fixing vulnerabilities. Users will give you vulnerable code and you will generate a fix based on the provided INSTRUCTION. INSTRUCTION: Detected MD5 hash algorithm which is considered insecure. MD5 is not collision resistant and is therefore not suitable as a cryptographic signature. Use SHA256 or SHA3 instead. Fix vulnerablity CWE-327: Use of a Broken or Risky Cryptographic Algorithm at return hashlib.md5(content).hexdigest() def md5_hash(path): with open(path, "rb") as f: content = f.read() return hashlib.md5(content).hexdigest() ''' example2='''You are a software engineer who is best in the world at summarizing code changes. Carefullly analyze the given old code and new code and generate a summary of the changes. Old Code: #include #include typedef struct Node { int data; struct Node *next; } Node; void processList() { Node *head = (Node*)malloc(sizeof(Node)); head->data = 1; head->next = (Node*)malloc(sizeof(Node)); head->next->data = 2; printf("First element: %d\n", head->data); free(head->next); free(head); printf("Accessing freed list: %d\n", head->next->data); } New Code: #include #include typedef struct Node { int data; struct Node *next; } Node; void processList() { Node *head = (Node*)malloc(sizeof(Node)); if (head == NULL) { perror("Failed to allocate memory for head"); return; } head->data = 1; head->next = (Node*)malloc(sizeof(Node)); if (head->next == NULL) { free(head); perror("Failed to allocate memory for next node"); return; } head->next->data = 2; printf("First element: %d\n", head->data); free(head->next); head->next = NULL; free(head); head = NULL; if (head != NULL && head->next != NULL) { printf("Accessing freed list: %d\n", head->next->data); } } ''' example3='''You are a senior security researcher who is best in the world at detecting vulnerabilities. Carefully analyze the given code snippet and track the data flows from various sources to sinks. Assume that any call to an unknown external API is not sanitized. Respond only with either "YES" or "NO". Is the following code prone to CWE-117: Improper Output Neutralization for Logs at logger.info("Received request with API Key: %s", api_key) with source logging.getLogger(__name__) and sink logger.info("Received request with API Key: %s", api_key) from flask import Flask, request, jsonify import logging app = Flask(__name__) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @app.route('/api/data', methods=['GET']) def get_data(): api_key = request.args.get('api_key') logger.info("Received request with API Key: %s", api_key) data = {"message": "Data processed"} return jsonify(data) ''' example4='''You are a senior software engineer who is best in the world at fixing vulnerabilities. Users will give you vulnerable code and you will generate a fix based on the provided INSTRUCTION. INSTRUCTION: Detected subprocess function 'run' with user controlled data. A malicious actor could leverage this to perform command injection. You may consider using 'shlex.escape()'. Fix vulnerability CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection') at result = subprocess.run(**run_kwargs) def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str: if desc is not None: print(desc) run_kwargs = {{ "args": command, "shell": True, "env": os.environ if custom_env is None else custom_env, "encoding": 'utf8', "errors": 'ignore', }} if not live: run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE result = subprocess.run(**run_kwargs) ##here if result.returncode != 0: error_bits = [ f"{{errdesc or 'Error running command'}}.", f"Command: {{command}}", f"Error code: {{result.returncode}}", ] if result.stdout: error_bits.append(f"stdout: {{result.stdout}}") if result.stderr: error_bits.append(f"stderr: {{result.stderr}}") raise RuntimeError("\n".join(error_bits)) return (result.stdout or "") ''' chat_interface = gr.ChatInterface( fn=generate, chatbot=gr.Chatbot(height="480px"), additional_inputs=[ gr.Textbox(label="System prompt", lines=4), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.2, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.95, ), ], stop_btn=None, examples=[ [example1], [example2], [example3], [example4], ["You are a coding assitant, who is best in the world at debugging. Create a snake game in Python."], ], ) with gr.Blocks(css="style.css",) as demo: gr.Markdown(DESCRIPTION) chat_interface.render() gr.Markdown(LICENSE, elem_classes="contain") if __name__ == "__main__": demo.queue(max_size=20).launch()