Spaces:
Configuration error
Configuration error
// https://github.com/ggerganov/llama.cpp/blob/master/examples/server/utils.hpp | |
using json = nlohmann::json; | |
extern bool server_verbose; | |
// | |
// parallel | |
// | |
enum server_state { | |
SERVER_STATE_LOADING_MODEL, // Server is starting up, model not fully loaded yet | |
SERVER_STATE_READY, // Server is ready and model is loaded | |
SERVER_STATE_ERROR // An error occurred, load_model failed | |
}; | |
enum task_type { | |
TASK_TYPE_COMPLETION, | |
TASK_TYPE_CANCEL, | |
TASK_TYPE_NEXT_RESPONSE | |
}; | |
struct task_server { | |
int id = -1; // to be filled by llama_server_queue | |
int target_id; | |
task_type type; | |
json data; | |
bool infill_mode = false; | |
bool embedding_mode = false; | |
int multitask_id = -1; | |
}; | |
struct task_result { | |
int id; | |
int multitask_id = -1; | |
bool stop; | |
bool error; | |
json result_json; | |
}; | |
struct task_multi { | |
int id; | |
std::set<int> subtasks_remaining{}; | |
std::vector<task_result> results{}; | |
}; | |
// TODO: can become bool if we can't find use of more states | |
enum slot_state | |
{ | |
IDLE, | |
PROCESSING, | |
}; | |
enum slot_command | |
{ | |
NONE, | |
LOAD_PROMPT, | |
RELEASE, | |
}; | |
struct slot_params | |
{ | |
bool stream = true; | |
bool cache_prompt = false; // remember the prompt to avoid reprocessing all prompt | |
uint32_t seed = -1; // RNG seed | |
int32_t n_keep = 0; // number of tokens to keep from initial prompt | |
int32_t n_predict = -1; // new tokens to predict | |
std::vector<std::string> antiprompt; | |
json input_prefix; | |
json input_suffix; | |
}; | |
struct slot_image | |
{ | |
int32_t id; | |
bool request_encode_image = false; | |
float * image_embedding = nullptr; | |
int32_t image_tokens = 0; | |
clip_image_u8 * img_data; | |
std::string prefix_prompt; // before of this image | |
}; | |
// completion token output with probabilities | |
struct completion_token_output | |
{ | |
struct token_prob | |
{ | |
llama_token tok; | |
float prob; | |
}; | |
std::vector<token_prob> probs; | |
llama_token tok; | |
std::string text_to_send; | |
}; | |
static inline void server_log(const char *level, const char *function, int line, | |
const char *message, const nlohmann::ordered_json &extra) | |
{ | |
nlohmann::ordered_json log | |
{ | |
{"timestamp", time(nullptr)}, | |
{"level", level}, | |
{"function", function}, | |
{"line", line}, | |
{"message", message}, | |
}; | |
if (!extra.empty()) | |
{ | |
log.merge_patch(extra); | |
} | |
const std::string str = log.dump(-1, ' ', false, json::error_handler_t::replace); | |
printf("%.*s\n", (int)str.size(), str.data()); | |
fflush(stdout); | |
} | |
// | |
// server utils | |
// | |
template <typename T> | |
static T json_value(const json &body, const std::string &key, const T &default_value) | |
{ | |
// Fallback null to default value | |
return body.contains(key) && !body.at(key).is_null() | |
? body.value(key, default_value) | |
: default_value; | |
} | |
inline std::string format_chatml(std::vector<json> messages) | |
{ | |
std::ostringstream chatml_msgs; | |
for (auto it = messages.begin(); it != messages.end(); ++it) { | |
chatml_msgs << "<|im_start|>" | |
<< json_value(*it, "role", std::string("user")) << '\n'; | |
chatml_msgs << json_value(*it, "content", std::string("")) | |
<< "<|im_end|>\n"; | |
} | |
chatml_msgs << "<|im_start|>assistant" << '\n'; | |
return chatml_msgs.str(); | |
} | |
// | |
// work queue utils | |
// | |
struct llama_server_queue { | |
int id = 0; | |
std::mutex mutex_tasks; | |
// queues | |
std::vector<task_server> queue_tasks; | |
std::vector<task_server> queue_tasks_deferred; | |
std::vector<task_multi> queue_multitasks; | |
std::condition_variable condition_tasks; | |
// callback functions | |
std::function<void(task_server&)> callback_new_task; | |
std::function<void(task_multi&)> callback_finish_multitask; | |
std::function<void(void)> callback_all_task_finished; | |
// Add a new task to the end of the queue | |
int post(task_server task) { | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
if (task.id == -1) { | |
task.id = id++; | |
} | |
queue_tasks.push_back(std::move(task)); | |
condition_tasks.notify_one(); | |
return task.id; | |
} | |
// Add a new task, but defer until one slot is available | |
void defer(task_server task) { | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
queue_tasks_deferred.push_back(std::move(task)); | |
} | |
// Get the next id for creating anew task | |
int get_new_id() { | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
return id++; | |
} | |
// Register function to process a new task | |
void on_new_task(std::function<void(task_server&)> callback) { | |
callback_new_task = callback; | |
} | |
// Register function to process a multitask | |
void on_finish_multitask(std::function<void(task_multi&)> callback) { | |
callback_finish_multitask = callback; | |
} | |
// Register the function to be called when the batch of tasks is finished | |
void on_all_tasks_finished(std::function<void(void)> callback) { | |
callback_all_task_finished = callback; | |
} | |
// Call when the state of one slot is changed | |
void notify_slot_changed() { | |
// move deferred tasks back to main loop | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
for (auto & task : queue_tasks_deferred) { | |
queue_tasks.push_back(std::move(task)); | |
} | |
queue_tasks_deferred.clear(); | |
} | |
// Start the main loop. This call is blocking | |
[[noreturn]] | |
void start_loop() { | |
while (true) { | |
// new task arrived | |
LOG_VERBOSE("have new task", {}); | |
{ | |
while (true) | |
{ | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
if (queue_tasks.empty()) { | |
lock.unlock(); | |
break; | |
} | |
task_server task = queue_tasks.front(); | |
queue_tasks.erase(queue_tasks.begin()); | |
lock.unlock(); | |
LOG_VERBOSE("callback_new_task", {}); | |
callback_new_task(task); | |
} | |
LOG_VERBOSE("callback_all_task_finished", {}); | |
// process and update all the multitasks | |
auto queue_iterator = queue_multitasks.begin(); | |
while (queue_iterator != queue_multitasks.end()) | |
{ | |
if (queue_iterator->subtasks_remaining.empty()) | |
{ | |
// all subtasks done == multitask is done | |
task_multi current_multitask = *queue_iterator; | |
callback_finish_multitask(current_multitask); | |
// remove this multitask | |
queue_iterator = queue_multitasks.erase(queue_iterator); | |
} | |
else | |
{ | |
++queue_iterator; | |
} | |
} | |
// all tasks in the current loop is finished | |
callback_all_task_finished(); | |
} | |
LOG_VERBOSE("wait for new task", {}); | |
// wait for new task | |
{ | |
std::unique_lock<std::mutex> lock(mutex_tasks); | |
if (queue_tasks.empty()) { | |
condition_tasks.wait(lock, [&]{ | |
return !queue_tasks.empty(); | |
}); | |
} | |
} | |
} | |
} | |
// | |
// functions to manage multitasks | |
// | |
// add a multitask by specifying the id of all subtask (subtask is a task_server) | |
void add_multitask(int multitask_id, std::vector<int>& sub_ids) | |
{ | |
std::lock_guard<std::mutex> lock(mutex_tasks); | |
task_multi multi; | |
multi.id = multitask_id; | |
std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); | |
queue_multitasks.push_back(multi); | |
} | |
// updatethe remaining subtasks, while appending results to multitask | |
void update_multitask(int multitask_id, int subtask_id, task_result& result) | |
{ | |
std::lock_guard<std::mutex> lock(mutex_tasks); | |
for (auto& multitask : queue_multitasks) | |
{ | |
if (multitask.id == multitask_id) | |
{ | |
multitask.subtasks_remaining.erase(subtask_id); | |
multitask.results.push_back(result); | |
} | |
} | |
} | |
}; | |
struct llama_server_response { | |
typedef std::function<void(int, int, task_result&)> callback_multitask_t; | |
callback_multitask_t callback_update_multitask; | |
// for keeping track of all tasks waiting for the result | |
std::set<int> waiting_task_ids; | |
// the main result queue | |
std::vector<task_result> queue_results; | |
std::mutex mutex_results; | |
std::condition_variable condition_results; | |
void add_waiting_task_id(int task_id) { | |
std::unique_lock<std::mutex> lock(mutex_results); | |
waiting_task_ids.insert(task_id); | |
} | |
void remove_waiting_task_id(int task_id) { | |
std::unique_lock<std::mutex> lock(mutex_results); | |
waiting_task_ids.erase(task_id); | |
} | |
// This function blocks the thread until there is a response for this task_id | |
task_result recv(int task_id) { | |
while (true) | |
{ | |
std::unique_lock<std::mutex> lock(mutex_results); | |
condition_results.wait(lock, [&]{ | |
return !queue_results.empty(); | |
}); | |
LOG_VERBOSE("condition_results unblock", {}); | |
for (int i = 0; i < (int) queue_results.size(); i++) | |
{ | |
if (queue_results[i].id == task_id) | |
{ | |
assert(queue_results[i].multitask_id == -1); | |
task_result res = queue_results[i]; | |
queue_results.erase(queue_results.begin() + i); | |
return res; | |
} | |
} | |
} | |
// should never reach here | |
} | |
// Register the function to update multitask | |
void on_multitask_update(callback_multitask_t callback) { | |
callback_update_multitask = callback; | |
} | |
// Send a new result to a waiting task_id | |
void send(task_result result) { | |
std::unique_lock<std::mutex> lock(mutex_results); | |
LOG_VERBOSE("send new result", {}); | |
for (auto& task_id : waiting_task_ids) { | |
// LOG_TEE("waiting task id %i \n", task_id); | |
// for now, tasks that have associated parent multitasks just get erased once multitask picks up the result | |
if (result.multitask_id == task_id) | |
{ | |
LOG_VERBOSE("callback_update_multitask", {}); | |
callback_update_multitask(task_id, result.id, result); | |
continue; | |
} | |
if (result.id == task_id) | |
{ | |
LOG_VERBOSE("queue_results.push_back", {}); | |
queue_results.push_back(result); | |
condition_results.notify_one(); | |
return; | |
} | |
} | |
} | |
}; | |
// | |
// base64 utils (TODO: move to common in the future) | |
// | |
static const std::string base64_chars = | |
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" | |
"abcdefghijklmnopqrstuvwxyz" | |
"0123456789+/"; | |
static inline bool is_base64(uint8_t c) | |
{ | |
return (isalnum(c) || (c == '+') || (c == '/')); | |
} | |
static inline std::vector<uint8_t> base64_decode(const std::string & encoded_string) | |
{ | |
int i = 0; | |
int j = 0; | |
int in_ = 0; | |
int in_len = encoded_string.size(); | |
uint8_t char_array_4[4]; | |
uint8_t char_array_3[3]; | |
std::vector<uint8_t> ret; | |
while (in_len-- && (encoded_string[in_] != '=') && is_base64(encoded_string[in_])) | |
{ | |
char_array_4[i++] = encoded_string[in_]; in_++; | |
if (i == 4) | |
{ | |
for (i = 0; i <4; i++) | |
{ | |
char_array_4[i] = base64_chars.find(char_array_4[i]); | |
} | |
char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); | |
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); | |
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; | |
for (i = 0; (i < 3); i++) | |
{ | |
ret.push_back(char_array_3[i]); | |
} | |
i = 0; | |
} | |
} | |
if (i) | |
{ | |
for (j = i; j <4; j++) | |
{ | |
char_array_4[j] = 0; | |
} | |
for (j = 0; j <4; j++) | |
{ | |
char_array_4[j] = base64_chars.find(char_array_4[j]); | |
} | |
char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); | |
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); | |
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; | |
for (j = 0; (j < i - 1); j++) | |
{ | |
ret.push_back(char_array_3[j]); | |
} | |
} | |
return ret; | |
} |