Source code for iguanas.rule_combination

import heapq
import itertools

import polars as pl

from .metrics import compute_metrics, compute_single_metric






[docs] def combine_rules_cumulative( R: pl.DataFrame, output_names: list[str] | None = None, operator: str = "or" ) -> pl.DataFrame: """Compute horizontal cumulative boolean operations across all columns. Parameters ---------- R : pl.DataFrame Input DataFrame. All columns will be used in the cumulative operation. output_names : list[str] | None, default=None List of names for the output columns. If None, generates names based on operator. Must have the same length as R.columns. operator : str, default='or' Boolean operator to apply: - 'or': cumulative OR (any True) - 'and': cumulative AND (all True) Returns ------- pl.DataFrame DataFrame with boolean values: - If operator='or': True if at least one condition is True up to that position - If operator='and': True if all conditions are True up to that position Raises ------ ValueError If operator is not 'or' or 'and', or if output_names length doesn't match columns. Examples -------- >>> import polars as pl >>> R = pl.DataFrame({ ... "rule_A": [True, False, True], ... "rule_B": [False, True, True], ... "rule_C": [True, True, False], ... }) >>> combine_rules_cumulative(R, operator="or") # Column 1: rule_A | ...; Column 2: rule_A | rule_B | ...; Column 3: all three >>> combine_rules_cumulative(R, operator="and", output_names=["step1", "step2", "step3"]) # Named columns, each True only if all rules up to that position are True """ if operator not in ["or", "and"]: raise ValueError(f"operator must be 'or' or 'and', got '{operator}'") columns = R.columns if output_names is None: separator = " | " if operator == "or" else " & " output_names = [ separator.join(f"({col})" for col in columns[: i + 1]) for i in range(len(columns)) ] elif len(output_names) != len(columns): raise ValueError( f"Length of output_names ({len(output_names)}) must match length of columns ({len(columns)})" ) cumsum_R = R.select(pl.cum_sum_horizontal(*columns)).unnest("cum_sum") if operator == "or": # Cumulative OR: at least one True (cumsum > 0) return cumsum_R.select( [ pl.col(col_name).gt(0).alias(output_name) for col_name, output_name in zip(columns, output_names, strict=False) ] ) # operator == 'and' # Cumulative AND: all True up to position i (cumsum == i+1) return cumsum_R.select( [ pl.col(col_name).eq(i + 1).alias(output_name) for i, (col_name, output_name) in enumerate(zip(columns, output_names, strict=False)) ] )
[docs] def combine_rules_greedy( R: pl.DataFrame, y: pl.Series, metric: str = "f1", max_rules: int = 5, operator: str = "or", weights: pl.Series | None = None, min_improvement: float = 0.0, ) -> pl.DataFrame: """Greedily select rules that maximize a performance metric. Starts with the best single rule, then iteratively adds rules that provide the largest metric improvement. Stops when no rule improves the metric by at least min_improvement or when max_rules is reached. Parameters ---------- R : pl.DataFrame DataFrame containing boolean rule columns. All columns will be used as candidate rules. y : pl.Series Boolean target series indicating true labels. metric : str, default="f1" Performance metric to optimize. Must be a column name produced by compute_metrics (e.g., "f1", "accuracy", "precision", "recall"). max_rules : int, default=5 Maximum number of rules to select. operator : str, default="or" Boolean operator for combining rules: 'or' or 'and'. weights : pl.Series | None, default=None Optional sample weights for weighted metric computation. min_improvement : float, default=0.0 Minimum metric improvement required to add a new rule. Returns ------- pl.DataFrame DataFrame with single column containing the combined rule. Column name reflects the selected rules using the operator. Examples -------- >>> import polars as pl >>> R = pl.DataFrame({"rule_A": [True, False, True], ... "rule_B": [False, True, True], ... "rule_C": [True, True, False]}) >>> y = pl.Series([True, True, False]) >>> result_R = combine_rules_greedy( ... R, y, metric="f1", max_rules=2 ... ) >>> print(result_R.columns) # e.g., ['(rule_B) | (rule_A)'] Raises ------ ValueError If operator is not 'or' or 'and', or if metric column not found. """ if operator not in ["or", "and"]: raise ValueError(f"operator must be 'or' or 'and', got '{operator}'") rules = R.columns if not rules: raise ValueError("rules list cannot be empty") selected_rules = [] metric_history = {} remaining_rules = rules.copy() current_best_metric = float("-inf") # Evaluate all single rules to find the best starting point metrics_R = compute_metrics(R.select(rules), y, weights) # Use weighted metric if weights are provided metric_to_use = f"{metric}_weight" if weights is not None else metric if metric_to_use not in metrics_R.columns: raise ValueError( f"Metric '{metric_to_use}' not found in computed metrics. " f"Available metrics: {list(metrics_R.columns)}" ) # Select best single rule best_idx = metrics_R[metric_to_use].arg_max() if best_idx is None: raise ValueError("Cannot find best rule - all metrics may be null or equal") best_rule = metrics_R["rule"].item(best_idx) current_best_metric = metrics_R[metric_to_use].item(best_idx) selected_rules.append(best_rule) remaining_rules.remove(best_rule) metric_history[0] = current_best_metric # Iteratively add rules that improve the metric for iteration in range(1, max_rules): if not remaining_rules: break best_candidate = None best_candidate_metric = current_best_metric # Create current combined rule if operator == "or": current_combined = R[selected_rules[0]] for rule in selected_rules[1:]: current_combined = current_combined | R[rule] else: # 'and' current_combined = R[selected_rules[0]] for rule in selected_rules[1:]: current_combined = current_combined & R[rule] # Try adding each remaining rule for candidate_rule in remaining_rules: if operator == "or": test_combined = current_combined | R[candidate_rule] else: # 'and' test_combined = current_combined & R[candidate_rule] # Evaluate the combination test_R = pl.DataFrame({"test_rule": test_combined}) test_metrics = compute_metrics(test_R, y, weights) candidate_metric = test_metrics[metric_to_use].item(0) if candidate_metric > best_candidate_metric: best_candidate = candidate_rule best_candidate_metric = candidate_metric # Check if improvement meets threshold improvement = best_candidate_metric - current_best_metric if best_candidate is None or improvement < min_improvement: break # Add the best candidate selected_rules.append(best_candidate) remaining_rules.remove(best_candidate) current_best_metric = best_candidate_metric metric_history[iteration] = current_best_metric # Create final combined rule separator = " | " if operator == "or" else " & " combined_rule_name = separator.join(f"({rule})" for rule in selected_rules) if operator == "or": final_combined = R[selected_rules[0]] for rule in selected_rules[1:]: final_combined = final_combined | R[rule] else: # 'and' final_combined = R[selected_rules[0]] for rule in selected_rules[1:]: final_combined = final_combined & R[rule] result_R = pl.DataFrame({combined_rule_name: final_combined}) return result_R
[docs] def combine_rules_a_star( R: pl.DataFrame, y: pl.Series, metric: str = "f1", max_rules: int = 5, operator: str = "or", weights: pl.Series | None = None, min_improvement: float = 0.0, return_top_k: int = 10, ) -> pl.DataFrame: """Find top rule combinations using A* search algorithm. Uses A* to efficiently explore the space of rule combinations, finding optimal or near-optimal combinations by balancing actual performance (g) with estimated potential (h). More thorough than greedy or beam search when finding the globally best combination is important. A* Cost Function: - g(n): Negative metric value (better metrics = lower cost) - h(n): Optimistic estimate of best possible improvement from remaining rules - f(n): g(n) + h(n) (total estimated cost) Parameters ---------- R : pl.DataFrame DataFrame containing boolean rule columns. All columns will be used as candidate rules. y : pl.Series Boolean target series indicating true labels. metric : str, default="f1" Performance metric to optimize. Must be a column name produced by compute_metrics (e.g., "f1", "accuracy", "precision", "recall"). max_rules : int, default=5 Maximum number of rules in a combination. operator : str, default="or" Boolean operator for combining rules: 'or' or 'and'. weights : pl.Series | None, default=None Optional sample weights for weighted metric computation. min_improvement : float, default=0.0 Minimum metric improvement required over parent combination to expand a node. Acts as a pruning criterion. return_top_k : int, default=10 Number of top combinations to return. Set to 1 for single best. Returns ------- pl.DataFrame DataFrame containing columns for the top rule combinations found. Each column represents one combination, with the column name showing the combined rule expression. Ordered by metric value (best first). Examples -------- >>> import polars as pl >>> R = pl.DataFrame({"rule_A": [True, False, True], ... "rule_B": [False, True, True], ... "rule_C": [True, True, False]}) >>> y = pl.Series([True, True, False]) >>> # Find single best combination >>> best = combine_rules_a_star(R, y, metric="f1", return_top_k=1) >>> # Find top 5 combinations >>> top_5 = combine_rules_a_star(R, y, metric="f1", return_top_k=5) Raises ------ ValueError If operator is not 'or' or 'and', or if metric column not found. Notes ----- A* is guaranteed to find the optimal solution if the heuristic is admissible (never overestimates the true cost). The heuristic used here estimates the best possible improvement from remaining rules, which is optimistic and thus admissible. """ if operator not in ["or", "and"]: raise ValueError(f"operator must be 'or' or 'and', got '{operator}'") rules = R.columns if not rules: raise ValueError("rules list cannot be empty") separator = " | " if operator == "or" else " & " metric_to_use = f"{metric}_weight" if weights is not None else metric # Precompute metrics for all single rules (for heuristic calculation) single_rule_metrics = {} metrics_R = compute_metrics(R, y, weights) if metric_to_use not in metrics_R.columns: raise ValueError( f"Metric '{metric_to_use}' not found in computed metrics. " f"Available metrics: {list(metrics_R.columns)}" ) # Build a rule→row-index map for O(1) lookups instead of O(n) list.index() rule_to_idx = {rule: idx for idx, rule in enumerate(rules)} for rule in rules: single_rule_metrics[rule] = metrics_R[metric_to_use].item(rule_to_idx[rule]) # Cache for computed combinations to avoid redundant evaluations combination_cache: dict[tuple[str, ...], float] = {} def compute_combination_metric(rule_list: list[str]) -> float: """Compute metric for a combination, using cache if available.""" rule_tuple = tuple(sorted(rule_list)) if rule_tuple in combination_cache: return combination_cache[rule_tuple] # Compute combined rule if operator == "or": combined = R[rule_list[0]] for r in rule_list[1:]: combined = combined | R[r] else: # 'and' combined = R[rule_list[0]] for r in rule_list[1:]: combined = combined & R[r] # Evaluate metric test_R = pl.DataFrame({"test_rule": combined}) test_metrics = compute_metrics(test_R, y, weights) metric_value = test_metrics[metric_to_use].item(0) combination_cache[rule_tuple] = metric_value return metric_value def heuristic(rule_list: list[str], current_metric: float) -> float: """ Admissible heuristic: optimistic estimate of improvement potential. Estimates the best possible improvement by assuming we can achieve the maximum single-rule improvement for each remaining slot. """ remaining_rules = [r for r in rules if r not in rule_list] if not remaining_rules: return 0.0 # Get the best metrics from remaining rules remaining_metrics = [single_rule_metrics[r] for r in remaining_rules] remaining_metrics.sort(reverse=True) # Optimistic: assume we can improve by the best remaining rule's metric # This is optimistic because actual combination might not achieve this slots_left = max_rules - len(rule_list) if slots_left <= 0: return 0.0 # Take top metrics for remaining slots (optimistic estimate) best_remaining = remaining_metrics[:slots_left] estimated_improvement = sum(best_remaining) / len(best_remaining) if best_remaining else 0.0 # Return negative (since we want to maximize metric = minimize negative metric) return -estimated_improvement # Priority queue: (f_score, counter, g_score, rule_list, rule_expr) # Counter ensures FIFO for equal f_scores counter = 0 open_set: list[tuple[float, int, float, list[str], str]] = [] # Initialize with all single rules for rule in rules: metric_value = single_rule_metrics[rule] g_score = -metric_value # Negative because we minimize cost h_score = heuristic([rule], metric_value) f_score = g_score + h_score heapq.heappush(open_set, (f_score, counter, g_score, [rule], rule)) counter += 1 # Track completed combinations (at max depth or promising ones) completed_combinations: list[tuple[float, list[str], str]] = [] # Track explored states to avoid redundant exploration explored: set[tuple[str, ...]] = set() # A* main loop while open_set: f_score, _, g_score, rule_list, rule_expr = heapq.heappop(open_set) # Skip if already explored this combination rule_tuple = tuple(sorted(rule_list)) if rule_tuple in explored: continue explored.add(rule_tuple) current_metric = -g_score # Convert back to positive metric # If at max depth, save as completed if len(rule_list) >= max_rules: completed_combinations.append((current_metric, rule_list[:], rule_expr)) continue # Also save current state as a potential solution completed_combinations.append((current_metric, rule_list[:], rule_expr)) # Expand node: try adding each remaining rule for candidate_rule in rules: if candidate_rule in rule_list: continue # Create new combination new_rule_list = rule_list + [candidate_rule] new_rule_tuple = tuple(sorted(new_rule_list)) # Skip if already explored if new_rule_tuple in explored: continue # Compute metric for new combination new_metric = compute_combination_metric(new_rule_list) # Check improvement threshold improvement = new_metric - current_metric if improvement < min_improvement: continue # Compute costs new_g_score = -new_metric new_h_score = heuristic(new_rule_list, new_metric) new_f_score = new_g_score + new_h_score # Create expression new_expr = separator.join(f"({r})" for r in new_rule_list) # Add to open set heapq.heappush(open_set, (new_f_score, counter, new_g_score, new_rule_list, new_expr)) counter += 1 # Sort completed combinations by metric (descending) completed_combinations.sort(key=lambda x: x[0], reverse=True) # Remove duplicates while preserving order seen = set() unique_combinations = [] for metric_val, rule_list, rule_expr in completed_combinations: rule_tuple = tuple(sorted(rule_list)) if rule_tuple not in seen: seen.add(rule_tuple) unique_combinations.append((metric_val, rule_list, rule_expr)) if len(unique_combinations) >= return_top_k: break # Build result DataFrame result_dict = {} for _, rule_list, rule_expr in unique_combinations: # Compute the combined rule if operator == "or": combined = R[rule_list[0]] for r in rule_list[1:]: combined = combined | R[r] else: # 'and' combined = R[rule_list[0]] for r in rule_list[1:]: combined = combined & R[r] result_dict[rule_expr] = combined return pl.DataFrame(result_dict)