File size: 1,910 Bytes
cf7b765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import re

from orq_ai_sdk import OrqAI

import config

client = OrqAI(api_key=os.environ["ORQ_API_KEY"], environment="develop")

secondary_llm_call = {
    "llm_judge_input": "llm_judge_input_JUDGE",
    "llm_judge_output": "llm_judge_output_JUDGE",
    "preflight_prompt": "preflight_prompt_JUDGE",
}


def stream_request(variant: str, secret: str, user_input: str):
    """Stream the response from the model."""
    stream = client.deployments.invoke_with_stream(
        key=config.ORQ_DEPLOYMENT_NAME,
        context={"step": variant},  # , "environments": []},
        inputs={"secret": secret, "user_input": user_input},
    )

    for chunk in stream:
        if not chunk.is_final:
            yield chunk.choices[0].message.content


def get_full_prompt(variant: str, inputs: dict):
    """Get the full prompt from a specific deployment."""
    deployment_config = client.deployments.get_config(
        key=config.ORQ_DEPLOYMENT_NAME,
        context={"step": variant},
    ).to_dict()
    prompts = {
        p["role"] + "_prompt": p["content"] for p in deployment_config["messages"]
    }

    for key, value in inputs.items():
        if value is not None:
            prompts["user_prompt"] = prompts["user_prompt"].replace(
                f"{{{{{key}}}}}", value
            )
            # prompts["user_prompt"] = prompts["user_prompt"] + '<span style="background-color:#ddd;">HELOOO</span>'
    return prompts


def run_judge(level: str, inputs: dict, expected_output: str = "yes"):
    generation = client.deployments.invoke(
        key=config.ORQ_DEPLOYMENT_NAME,
        context={"step": secondary_llm_call[level]},
        inputs=inputs,
    )
    answer = generation.choices[0].message.content
    # clean answer
    clean_answer = answer.split(" ")[-1].lower()
    clean_answer = re.sub(r"[^a-zA-Z ]+", "", clean_answer)
    return clean_answer == expected_output, answer