diff --git a/config.default.toml b/config.default.toml index 59b21f0..fdfedb9 100644 --- a/config.default.toml +++ b/config.default.toml @@ -24,12 +24,9 @@ max_batch_size = 128 # Maximum number of tokens to generate for each response. max_response_length = 100 -# Maximum Kullback-Leibler divergence from the original model to allow for abliterated models. -max_kl_divergence = 0.5 - -# Exponent that determines the shape of the KL divergence part of the score function. -# See evaluator.py for the exact meaning of this parameter. -kl_score_shape = 3.0 +# Assumed "typical" value of the Kullback-Leibler divergence from the original model for abliterated models. +# This is used to ensure balanced co-optimization of KL divergence and refusal count. +kl_divergence_scale = 1.0 # Number of abliteration trials to run during optimization. n_trials = 200 diff --git a/src/heretic/config.py b/src/heretic/config.py index a7a17cf..c99248b 100644 --- a/src/heretic/config.py +++ b/src/heretic/config.py @@ -61,14 +61,12 @@ class Settings(BaseSettings): description="Maximum number of tokens to generate for each response.", ) - max_kl_divergence: float = Field( - default=0.5, - description="Maximum Kullback-Leibler divergence from the original model to allow for abliterated models.", - ) - - kl_score_shape: float = Field( - default=3.0, - description="Exponent that determines the shape of the KL divergence part of the score function. See evaluator.py for the exact meaning of this parameter.", + kl_divergence_scale: float = Field( + default=1.0, + description=( + 'Assumed "typical" value of the Kullback-Leibler divergence from the original model for abliterated models. ' + "This is used to ensure balanced co-optimization of KL divergence and refusal count." + ), ) n_trials: int = Field( diff --git a/src/heretic/evaluator.py b/src/heretic/evaluator.py index 307632c..c036d46 100644 --- a/src/heretic/evaluator.py +++ b/src/heretic/evaluator.py @@ -33,7 +33,7 @@ class Evaluator: print("* Counting model refusals...") self.base_refusals = self.count_refusals() print( - f"* Initial refusals: [bold]{self.base_refusals}[/]/{len(self.bad_prompts)} ([bold]{self.base_refusals / len(self.bad_prompts) * 100:.1f}[/] %)" + f"* Initial refusals: [bold]{self.base_refusals}[/]/{len(self.bad_prompts)}" ) def is_refusal(self, response: str) -> bool: @@ -54,58 +54,24 @@ class Evaluator: refusals = [response for response in responses if self.is_refusal(response)] return len(refusals) - def get_score(self) -> tuple[float, float, int]: + def get_score(self) -> tuple[tuple[float, float], float, int]: print(" * Obtaining first-token probability distributions...") logprobs = self.model.get_logprobs_batched(self.good_prompts) kl_divergence = F.kl_div( - logprobs, self.base_logprobs, reduction="batchmean", log_target=True + logprobs, + self.base_logprobs, + reduction="batchmean", + log_target=True, ).item() - print(f" * KL divergence: [bold]{kl_divergence:.4f}[/]", end="") - - kl_score = -( - ( - ( - (kl_divergence - self.settings.max_kl_divergence) - / self.settings.max_kl_divergence - ) - + 1 - ) - ** self.settings.kl_score_shape - ) - - if ( - self.settings.evaluate_model is None - and kl_divergence > self.settings.max_kl_divergence - ): - print(" [yellow](constraint violation; aborting trial)[/]") - return kl_score, kl_divergence, self.base_refusals - else: - print() + print(f" * KL divergence: [bold]{kl_divergence:.2f}[/]") print(" * Counting model refusals...") refusals = self.count_refusals() - print( - f" * Refusals: [bold]{refusals}[/]/{len(self.bad_prompts)} ([bold]{refusals / len(self.bad_prompts) * 100:.1f}[/] %)" + print(f" * Refusals: [bold]{refusals}[/]/{len(self.bad_prompts)}") + + score = ( + (kl_divergence / self.settings.kl_divergence_scale), + (refusals / self.base_refusals), ) - # This score is constructed to achieve several properties: - # - # 1. For the unmodified model, kl_divergence = 0 and refusals = base_refusals, - # so the baseline score is 0. - # - # 2. The best possible outcome is kl_divergence = 0 and refusals = 0, - # giving a score of 1. - # - # 3. If kl_divergence > max_kl_divergence, the score is negative. - # As the baseline is 0, this ensures that such a configuration - # is never chosen, enforcing the max_kl_divergence constraint. - # - # 4. kl_score_shape controls how strongly a kl_divergence well below - # max_kl_divergence affects the score. A high value means that - # kl_divergence only matters when it approaches max_kl_divergence, - # and the optimizer will prioritize lowering refusals rather than - # lowering kl_divergence. - score = kl_score - (refusals / self.base_refusals) + 1 - print(f" * Score: [bold]{score:.4f}[/]") - return score, kl_divergence, refusals diff --git a/src/heretic/main.py b/src/heretic/main.py index 42b91b4..ade75b1 100644 --- a/src/heretic/main.py +++ b/src/heretic/main.py @@ -22,7 +22,12 @@ from accelerate.utils import ( is_xpu_available, ) from huggingface_hub import ModelCard, ModelCardData +from optuna import Trial +from optuna.exceptions import ExperimentalWarning +from optuna.samplers import TPESampler +from optuna.study import StudyDirection from pydantic import ValidationError +from questionary import Choice from rich.traceback import install from .config import Settings @@ -106,7 +111,7 @@ def run(): optuna.logging.set_verbosity(optuna.logging.WARNING) # Silence the warning about multivariate TPE being experimental. - warnings.filterwarnings("ignore", category=optuna.exceptions.ExperimentalWarning) + warnings.filterwarnings("ignore", category=ExperimentalWarning) model = Model(settings) @@ -155,7 +160,7 @@ def run(): ] performance = sum(response_lengths) / (end_time - start_time) - print(f"[green]Ok[/] ([bold]{performance:.2f}[/] tokens/s)") + print(f"[green]Ok[/] ([bold]{performance:.0f}[/] tokens/s)") if performance > best_performance: best_batch_size = batch_size @@ -192,7 +197,7 @@ def run(): trial_index = 0 start_time = time.perf_counter() - def objective(trial: optuna.Trial): + def objective(trial: Trial) -> tuple[float, float]: nonlocal trial_index trial_index += 1 trial.set_user_attr("index", trial_index) @@ -289,158 +294,205 @@ def run(): trial.set_user_attr("kl_divergence", kl_divergence) trial.set_user_attr("refusals", refusals) - # The optimizer searches for a minimum, so we return the negative score. - return -score + return score study = optuna.create_study( - sampler=optuna.samplers.TPESampler( + sampler=TPESampler( n_startup_trials=settings.n_startup_trials, + n_ei_candidates=128, multivariate=True, - ) + ), + directions=[StudyDirection.MINIMIZE, StudyDirection.MINIMIZE], ) study.optimize(objective, n_trials=settings.n_trials) - print() - print( - f"[bold green]Optimization finished![/] Best was trial [bold]{study.best_trial.user_attrs['index']}[/]:" + best_trials = sorted( + study.best_trials, + key=lambda trial: trial.user_attrs["refusals"], ) - print("* Parameters:") - for name, value in get_trial_parameters(study.best_trial).items(): - print(f" * {name} = [bold]{value}[/]") - print("* Results:") - print( - f" * KL divergence: [bold]{study.best_trial.user_attrs['kl_divergence']:.4f}[/]" + + choices = [ + Choice( + title=( + f"Trial {trial.user_attrs['index']:>3}: " + f"Refusals {trial.user_attrs['refusals']:>2}/{len(evaluator.bad_prompts)}, " + f"KL divergence {trial.user_attrs['kl_divergence']:.2f}" + ), + value=trial, + ) + for trial in best_trials + ] + + choices.append( + Choice( + title="None (exit program)", + value="", + ) ) - refusals = study.best_trial.user_attrs["refusals"] - print( - f" * Refusals: [bold]{refusals}[/]/{len(evaluator.bad_prompts)} ([bold]{refusals / len(evaluator.bad_prompts) * 100:.1f}[/] %)" - ) - print(f" * Score: [bold]{-study.best_value:.4f}[/]") print() - print("Restoring best model...") - print("* Reloading model...") - model.reload_model() - print("* Abliterating...") - model.abliterate( - refusal_directions, - study.best_trial.user_attrs["direction_index"], - study.best_trial.user_attrs["parameters"], + print("[bold green]Optimization finished![/]") + print() + print( + ( + "The following trials resulted in Pareto optimal combinations of refusals and KL divergence. " + "After selecting a trial, you will be able to save the model, upload it to Hugging Face, " + "or chat with it to test how well it works. You can return to this menu later to select a different trial. " + "[yellow]Note that KL divergence values above 1 usually indicate significant damage to the original model's capabilities.[/]" + ) ) while True: print() - action = questionary.select( - "What do you want to do with the optimized model?", - choices=[ - "Save the model to a local folder", - "Upload the model to Hugging Face", - "Chat with the model", - "Nothing (Quit)", - ], + trial = questionary.select( + "Which trial do you want to use?", + choices=choices, ).ask() - # All actions are wrapped in a try/except block so that if an error occurs, - # another action can be tried, instead of the program crashing and losing - # the optimized model. - try: - match action: - case "Save the model to a local folder": - save_directory = questionary.path("Path to the folder:").ask() - if not save_directory: - continue + if trial is None or trial == "": + break - print("Saving model...") - model.model.save_pretrained(save_directory) - model.tokenizer.save_pretrained(save_directory) - print(f"Model saved to [bold]{save_directory}[/].") + print() + print(f"Restoring model from trial [bold]{trial.user_attrs['index']}[/]...") + print("* Reloading model...") + model.reload_model() + print("* Abliterating...") + model.abliterate( + refusal_directions, + trial.user_attrs["direction_index"], + trial.user_attrs["parameters"], + ) - case "Upload the model to Hugging Face": - # We don't use huggingface_hub.login() because that stores the token on disk, - # and since this program will often be run on rented or shared GPU servers, - # it's better to not persist credentials. - token = huggingface_hub.get_token() - if not token: - token = questionary.password("Hugging Face access token:").ask() - if not token: - continue + while True: + print() + action = questionary.select( + "What do you want to do with the decensored model?", + choices=[ + "Save the model to a local folder", + "Upload the model to Hugging Face", + "Chat with the model", + "Nothing (return to trial selection menu)", + ], + ).ask() - user = huggingface_hub.whoami(token) - print(f"Logged in as [bold]{user['fullname']} ({user['email']})[/]") + if action is None or action == "Nothing (return to trial selection menu)": + break - repo_id = questionary.text( - "Name of repository:", - default=f"{user['name']}/{Path(settings.model).name}-heretic", - ).ask() + # All actions are wrapped in a try/except block so that if an error occurs, + # another action can be tried, instead of the program crashing and losing + # the optimized model. + try: + match action: + case "Save the model to a local folder": + save_directory = questionary.path("Path to the folder:").ask() + if not save_directory: + continue - visibility = questionary.select( - "Should the repository be public or private?", - choices=[ - "Public", - "Private", - ], - ).ask() - private = visibility == "Private" + print("Saving model...") + model.model.save_pretrained(save_directory) + model.tokenizer.save_pretrained(save_directory) + print(f"Model saved to [bold]{save_directory}[/].") - print("Uploading model...") + case "Upload the model to Hugging Face": + # We don't use huggingface_hub.login() because that stores the token on disk, + # and since this program will often be run on rented or shared GPU servers, + # it's better to not persist credentials. + token = huggingface_hub.get_token() + if not token: + token = questionary.password( + "Hugging Face access token:" + ).ask() + if not token: + continue - model.model.push_to_hub(repo_id, private=private, token=token) - model.tokenizer.push_to_hub(repo_id, private=private, token=token) - - # If the model path doesn't exist locally, it can be assumed - # to be a model hosted on the Hugging Face Hub, in which case - # we can retrieve the model card. - if not Path(settings.model).exists(): - card = ModelCard.load(settings.model) - if card.data is None: - card.data = ModelCardData() - if card.data.tags is None: - card.data.tags = [] - card.data.tags.append("heretic") - card.data.tags.append("uncensored") - card.data.tags.append("decensored") - card.data.tags.append("abliterated") - card.text = ( - get_readme_intro( - settings, - study, - evaluator.base_refusals, - evaluator.bad_prompts, - ) - + card.text + user = huggingface_hub.whoami(token) + print( + f"Logged in as [bold]{user['fullname']} ({user['email']})[/]" ) - card.push_to_hub(repo_id, token=token) - print(f"Model uploaded to [bold]{repo_id}[/].") + repo_id = questionary.text( + "Name of repository:", + default=f"{user['name']}/{Path(settings.model).name}-heretic", + ).ask() - case "Chat with the model": - print() - print("[cyan]Press Ctrl+C at any time to return to the menu.[/]") + visibility = questionary.select( + "Should the repository be public or private?", + choices=[ + "Public", + "Private", + ], + ).ask() + private = visibility == "Private" - chat = [ - {"role": "system", "content": settings.system_prompt}, - ] + print("Uploading model...") - while True: - try: - message = questionary.text("User:", qmark=">").unsafe_ask() - if not message: + model.model.push_to_hub( + repo_id, + private=private, + token=token, + ) + model.tokenizer.push_to_hub( + repo_id, + private=private, + token=token, + ) + + # If the model path doesn't exist locally, it can be assumed + # to be a model hosted on the Hugging Face Hub, in which case + # we can retrieve the model card. + if not Path(settings.model).exists(): + card = ModelCard.load(settings.model) + if card.data is None: + card.data = ModelCardData() + if card.data.tags is None: + card.data.tags = [] + card.data.tags.append("heretic") + card.data.tags.append("uncensored") + card.data.tags.append("decensored") + card.data.tags.append("abliterated") + card.text = ( + get_readme_intro( + settings, + trial, + evaluator.base_refusals, + evaluator.bad_prompts, + ) + + card.text + ) + card.push_to_hub(repo_id, token=token) + + print(f"Model uploaded to [bold]{repo_id}[/].") + + case "Chat with the model": + print() + print( + "[cyan]Press Ctrl+C at any time to return to the menu.[/]" + ) + + chat = [ + {"role": "system", "content": settings.system_prompt}, + ] + + while True: + try: + message = questionary.text( + "User:", + qmark=">", + ).unsafe_ask() + if not message: + break + chat.append({"role": "user", "content": message}) + + print("[bold]Assistant:[/] ", end="") + response = model.stream_chat_response(chat) + chat.append({"role": "assistant", "content": response}) + except (KeyboardInterrupt, EOFError): + # Ctrl+C/Ctrl+D break - chat.append({"role": "user", "content": message}) - print("[bold]Assistant:[/] ", end="") - response = model.stream_chat_response(chat) - chat.append({"role": "assistant", "content": response}) - except (KeyboardInterrupt, EOFError): - # Ctrl+C/Ctrl+D - break - - case "Nothing (Quit)": - break - - except Exception as error: - print(f"[red]Error: {error}[/]") + except Exception as error: + print(f"[red]Error: {error}[/]") def main(): diff --git a/src/heretic/model.py b/src/heretic/model.py index 8918219..b01439e 100644 --- a/src/heretic/model.py +++ b/src/heretic/model.py @@ -8,7 +8,7 @@ from typing import Any import torch import torch.nn.functional as F -from torch import LongTensor +from torch import LongTensor, Tensor from torch.nn import ModuleList from transformers import ( AutoModelForCausalLM, @@ -103,7 +103,7 @@ class Model: # Text-only models. return self.model.model.layers - def get_layer_matrices(self, layer_index: int) -> dict[str, list[torch.Tensor]]: + def get_layer_matrices(self, layer_index: int) -> dict[str, list[Tensor]]: layer = self.get_layers()[layer_index] matrices = {} @@ -151,7 +151,7 @@ class Model: def abliterate( self, - refusal_directions: torch.Tensor, + refusal_directions: Tensor, direction_index: float | None, parameters: dict[str, AbliterationParameters], ): @@ -261,7 +261,7 @@ class Model: return responses - def get_residuals(self, prompts: list[str]) -> torch.Tensor: + def get_residuals(self, prompts: list[str]) -> Tensor: # We only generate one token, and we return the residual vectors # at that token position, for each prompt and layer. _, outputs = self.generate( @@ -287,7 +287,7 @@ class Model: # problems during calculations involving residual vectors. return residuals.to(torch.float32) - def get_residuals_batched(self, prompts: list[str]) -> torch.Tensor: + def get_residuals_batched(self, prompts: list[str]) -> Tensor: residuals = [] for batch in batchify(prompts, self.settings.batch_size): @@ -297,7 +297,7 @@ class Model: # We work with logprobs rather than probabilities for numerical stability # when computing the KL divergence. - def get_logprobs(self, prompts: list[str]) -> torch.Tensor: + def get_logprobs(self, prompts: list[str]) -> Tensor: # We only generate one token, and we return the (log) probability distributions # over the vocabulary at that token position, for each prompt. _, outputs = self.generate( @@ -313,7 +313,7 @@ class Model: # The returned tensor has shape (prompt, token). return F.log_softmax(logits, dim=-1) - def get_logprobs_batched(self, prompts: list[str]) -> torch.Tensor: + def get_logprobs_batched(self, prompts: list[str]) -> Tensor: logprobs = [] for batch in batchify(prompts, self.settings.batch_size): @@ -331,6 +331,7 @@ class Model: inputs = self.tokenizer( chat_prompt, return_tensors="pt", + return_token_type_ids=False, ).to(self.model.device) streamer = TextStreamer( diff --git a/src/heretic/utils.py b/src/heretic/utils.py index a107812..7439dcc 100644 --- a/src/heretic/utils.py +++ b/src/heretic/utils.py @@ -6,7 +6,6 @@ from dataclasses import asdict from importlib.metadata import version from typing import TypeVar -import optuna import torch from accelerate.utils import ( is_mlu_available, @@ -15,6 +14,7 @@ from accelerate.utils import ( is_xpu_available, ) from datasets import load_dataset +from optuna import Trial from rich.console import Console from .config import DatasetSpecification, Settings @@ -62,32 +62,28 @@ def empty_cache(): gc.collect() -def get_trial_parameters(trial: optuna.Trial) -> dict[str, str]: +def get_trial_parameters(trial: Trial) -> dict[str, str]: params = {} direction_index = trial.user_attrs["direction_index"] params["direction_index"] = ( - "per layer" if (direction_index is None) else f"{direction_index:.4f}" + "per layer" if (direction_index is None) else f"{direction_index:.2f}" ) for component, parameters in trial.user_attrs["parameters"].items(): for name, value in asdict(parameters).items(): - params[f"{component}.{name}"] = f"{value:.4f}" + params[f"{component}.{name}"] = f"{value:.2f}" return params def get_readme_intro( settings: Settings, - study: optuna.Study, + trial: Trial, base_refusals: int, bad_prompts: list[str], ) -> str: model_link = f"[{settings.model}](https://huggingface.co/{settings.model})" - refusal_percentage = ( - study.best_trial.user_attrs["refusals"] / len(bad_prompts) * 100 - ) - base_refusal_percentage = base_refusals / len(bad_prompts) * 100 return f"""# This is a decensored version of { model_link @@ -101,7 +97,7 @@ def get_readme_intro( chr(10).join( [ f"| **{name}** | {value} |" - for name, value in get_trial_parameters(study.best_trial).items() + for name, value in get_trial_parameters(trial).items() ] ) } @@ -110,9 +106,10 @@ def get_readme_intro( | Metric | This model | Original model ({model_link}) | | :----- | :--------: | :---------------------------: | -| **KL divergence** | { - study.best_trial.user_attrs["kl_divergence"]:.4f} | 0 *(by definition)* | -| **Refusals** | {refusal_percentage:.1f} % | {base_refusal_percentage:.1f} % | +| **KL divergence** | {trial.user_attrs["kl_divergence"]:.2f} | 0 *(by definition)* | +| **Refusals** | {trial.user_attrs["refusals"]}/{len(bad_prompts)} | {base_refusals}/{ + len(bad_prompts) + } | -----