Source code for geowombat.detect.metrics

"""Accuracy assessment for object detection.

Computes IoU-matched precision/recall/F1/AP per class and produces a
review-ready GeoDataFrame suitable for stepping through in QGIS (e.g.
with the GoToNextFeature3+ plugin or the built-in Form view) to verify
TP / FP / FN judgments manually before reporting final metrics.

Example
-------
>>> from geowombat.detect import (
...     detection_accuracy, export_for_review, recompute_from_review,
... )
>>> results = detection_accuracy(pred_gdf, truth_gdf, class_col='species')
>>> results['metrics']               # per-class precision/recall/F1/AP
>>> export_for_review(results['matched'], 'review.gpkg')
>>> # ... edit reviewer_label in QGIS, then ...
>>> final = recompute_from_review('review.gpkg', class_col='species')
"""

from pathlib import Path

import geopandas as gpd
import numpy as np
import pandas as pd


# ---------------------------------------------------------------------------
# IoU + matching
# ---------------------------------------------------------------------------

def _iou(a, b):
    if a.is_empty or b.is_empty:
        return 0.0
    inter = a.intersection(b).area
    if inter <= 0:
        return 0.0
    union = a.area + b.area - inter
    return inter / union if union > 0 else 0.0


def _greedy_match(preds, truths, iou_threshold):
    """Greedy IoU matching of predictions to ground truth.

    Predictions are processed in descending score order. Each truth can
    only be matched once. Returns three lists of indices:
    ``(tp_pairs, fp_idx, fn_idx)`` where ``tp_pairs`` are
    ``(pred_idx, truth_idx, iou)``.
    """
    if preds.empty and truths.empty:
        return [], [], []
    if preds.empty:
        return [], [], list(truths.index)
    if truths.empty:
        return [], list(preds.index), []

    pred_order = preds.sort_values('score', ascending=False).index.tolist()
    truth_geoms = {i: g for i, g in truths.geometry.items()}
    matched_truth = set()

    tp = []
    fp = []
    for pi in pred_order:
        pg = preds.geometry.loc[pi]
        best_iou = 0.0
        best_ti = None
        for ti, tg in truth_geoms.items():
            if ti in matched_truth:
                continue
            i = _iou(pg, tg)
            if i > best_iou:
                best_iou = i
                best_ti = ti
        if best_ti is not None and best_iou >= iou_threshold:
            tp.append((pi, best_ti, best_iou))
            matched_truth.add(best_ti)
        else:
            fp.append(pi)

    fn = [ti for ti in truths.index if ti not in matched_truth]
    return tp, fp, fn


# ---------------------------------------------------------------------------
# Per-class AP
# ---------------------------------------------------------------------------

def _voc_ap(recall, precision):
    """11-point interpolated PASCAL VOC AP."""
    ap = 0.0
    for t in np.linspace(0, 1, 11):
        p = precision[recall >= t]
        ap += (p.max() if p.size else 0.0) / 11.0
    return float(ap)


def _ap_for_class(class_preds, class_truths, iou_threshold):
    if class_truths.empty:
        return {
            'ap': 0.0, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0,
            'tp': 0, 'fp': len(class_preds), 'fn': 0,
            'support': 0,
        }
    if class_preds.empty:
        return {
            'ap': 0.0, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0,
            'tp': 0, 'fp': 0, 'fn': len(class_truths),
            'support': len(class_truths),
        }

    order = class_preds.sort_values('score', ascending=False).index.tolist()
    matched = set()
    tps = np.zeros(len(order), dtype=np.int32)
    fps = np.zeros(len(order), dtype=np.int32)

    for i, pi in enumerate(order):
        pg = class_preds.geometry.loc[pi]
        best_iou = 0.0
        best_ti = None
        for ti, tg in class_truths.geometry.items():
            if ti in matched:
                continue
            iou = _iou(pg, tg)
            if iou > best_iou:
                best_iou = iou
                best_ti = ti
        if best_ti is not None and best_iou >= iou_threshold:
            tps[i] = 1
            matched.add(best_ti)
        else:
            fps[i] = 1

    cum_tp = np.cumsum(tps)
    cum_fp = np.cumsum(fps)
    n_truth = len(class_truths)
    recall = cum_tp / max(n_truth, 1)
    precision = cum_tp / np.maximum(cum_tp + cum_fp, 1)
    ap = _voc_ap(recall, precision)

    tp_total = int(cum_tp[-1])
    fp_total = int(cum_fp[-1])
    fn_total = n_truth - tp_total
    p = tp_total / max(tp_total + fp_total, 1)
    r = tp_total / max(n_truth, 1)
    f1 = 2 * p * r / max(p + r, 1e-12)

    return {
        'ap': float(ap),
        'precision': float(p),
        'recall': float(r),
        'f1': float(f1),
        'tp': tp_total,
        'fp': fp_total,
        'fn': fn_total,
        'support': n_truth,
    }


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

[docs]def detection_accuracy( predictions, truth, class_col='class_name', iou_thresholds=(0.5,), score_col='score', class_agnostic=False, ): """Compute detection accuracy metrics + a review-ready GeoDataFrame. Parameters ---------- predictions : geopandas.GeoDataFrame Detector output. Must include ``geometry``, ``score`` (or ``score_col``), and a class column. truth : geopandas.GeoDataFrame Ground-truth boxes/polygons in the same CRS. class_col : str Column with class labels in both inputs. iou_thresholds : sequence of float, or 'coco' IoU thresholds to evaluate. ``'coco'`` expands to 0.5..0.95 step 0.05 and returns mAP@[.5:.95]. score_col : str Score column in predictions. Default 'score'. class_agnostic : bool If True, ignore class labels (treat as a single class). Returns ------- dict Keys: - ``metrics``: DataFrame indexed by class with columns ``precision, recall, f1, ap, tp, fp, fn, support`` for each IoU threshold. - ``summary``: dict of overall mAP per threshold + mAP@[.5:.95]. - ``matched``: GeoDataFrame of TP/FP/FN rows ready for QGIS review (see ``export_for_review``). - ``confusion``: DataFrame of class confusion (per-class truth vs. matched-prediction class) at the lowest IoU. """ if predictions.crs is None or truth.crs is None: raise ValueError("Both predictions and truth must have a CRS set.") if predictions.crs.to_epsg() != truth.crs.to_epsg(): truth = truth.to_crs(predictions.crs) # Reset to a simple 0..N integer index — osmnx and other sources can # return MultiIndex GeoDataFrames whose keys aren't castable to int. preds = predictions.reset_index(drop=True).copy() truths = truth.reset_index(drop=True).copy() if score_col not in preds.columns: raise KeyError( f"predictions missing score column '{score_col}'." ) if class_agnostic: preds['_cls'] = '_all_' truths['_cls'] = '_all_' class_col_use = '_cls' else: if class_col not in preds.columns: raise KeyError( f"predictions missing class column '{class_col}'." ) if class_col not in truths.columns: raise KeyError( f"truth missing class column '{class_col}'." ) class_col_use = class_col preds = preds.rename(columns={score_col: 'score'}) if iou_thresholds == 'coco': thresholds = list(np.round(np.arange(0.5, 1.0, 0.05), 2)) else: thresholds = list(iou_thresholds) classes = sorted( set(preds[class_col_use].dropna().unique().tolist()) | set(truths[class_col_use].dropna().unique().tolist()) ) rows = [] for thr in thresholds: for cls in classes: cp = preds[preds[class_col_use] == cls] ct = truths[truths[class_col_use] == cls] stats = _ap_for_class(cp, ct, thr) stats.update({'class': cls, 'iou_threshold': thr}) rows.append(stats) metrics_df = pd.DataFrame(rows).set_index(['iou_threshold', 'class']) summary = {} for thr in thresholds: ap_vals = metrics_df.loc[thr, 'ap'].values summary[f'mAP@{thr}'] = float(np.mean(ap_vals)) if len(ap_vals) else 0.0 if iou_thresholds == 'coco': summary['mAP@[.5:.95]'] = float(np.mean(list(summary.values()))) # ------------------------------------------------------------------ # Build review GeoDataFrame at the lowest IoU threshold — that's the # one humans typically audit (most lenient pairing). # ------------------------------------------------------------------ primary_iou = min(thresholds) review_rows = [] confusion_rows = [] # Per-class greedy matching to fill TP/FP/FN with class context for cls in classes: cp = preds[preds[class_col_use] == cls] ct = truths[truths[class_col_use] == cls] tp_pairs, fp_idx, fn_idx = _greedy_match(cp, ct, primary_iou) for pi, ti, iou_val in tp_pairs: review_rows.append({ 'geometry': preds.geometry.loc[pi], 'status': 'TP', 'class_name': cls, 'truth_class': cls, 'score': float(preds['score'].loc[pi]), 'iou': float(iou_val), 'pred_idx': int(pi), 'truth_idx': int(ti), 'reviewer_label': '', 'notes': '', }) confusion_rows.append((cls, cls)) for pi in fp_idx: # Check whether the FP overlaps a truth of a different class pg = preds.geometry.loc[pi] best_other = None best_iou = 0.0 for ti, tg in truths.geometry.items(): if truths[class_col_use].loc[ti] == cls: continue v = _iou(pg, tg) if v > best_iou: best_iou = v best_other = ti if best_other is not None and best_iou >= primary_iou: truth_cls = truths[class_col_use].loc[best_other] review_rows.append({ 'geometry': pg, 'status': 'FP_class', 'class_name': cls, 'truth_class': truth_cls, 'score': float(preds['score'].loc[pi]), 'iou': float(best_iou), 'pred_idx': int(pi), 'truth_idx': int(best_other), 'reviewer_label': '', 'notes': '', }) confusion_rows.append((truth_cls, cls)) else: review_rows.append({ 'geometry': pg, 'status': 'FP', 'class_name': cls, 'truth_class': None, 'score': float(preds['score'].loc[pi]), 'iou': float(best_iou), 'pred_idx': int(pi), 'truth_idx': -1, 'reviewer_label': '', 'notes': '', }) for ti in fn_idx: review_rows.append({ 'geometry': truths.geometry.loc[ti], 'status': 'FN', 'class_name': None, 'truth_class': cls, 'score': np.nan, 'iou': 0.0, 'pred_idx': -1, 'truth_idx': int(ti), 'reviewer_label': '', 'notes': '', }) confusion_rows.append((cls, '_missed_')) matched_gdf = gpd.GeoDataFrame( review_rows, geometry='geometry', crs=preds.crs, ) # Confusion matrix (truth vs. predicted class label among matched) if confusion_rows: cm = pd.crosstab( pd.Series([r[0] for r in confusion_rows], name='truth'), pd.Series([r[1] for r in confusion_rows], name='predicted'), ) else: cm = pd.DataFrame() return { 'metrics': metrics_df, 'summary': summary, 'matched': matched_gdf, 'confusion': cm, }
[docs]def export_for_review(matched_gdf, out_path, layer='detections_review'): """Write a matched GeoDataFrame to GeoPackage for QGIS review. The output file is suitable for the QGIS attribute form workflow and feature-stepping plugins (e.g. GoToNextFeature3+). The ``reviewer_label`` field is empty for the user to fill in (values like ``'TP'``, ``'FP'``, ``'FN'``, ``'unclear'`` are recommended); ``notes`` is free text. Parameters ---------- matched_gdf : geopandas.GeoDataFrame Output from ``detection_accuracy``. out_path : str or Path Destination .gpkg path. layer : str GeoPackage layer name. """ out_path = Path(out_path) out_path.parent.mkdir(parents=True, exist_ok=True) matched_gdf.to_file(out_path, layer=layer, driver='GPKG') return out_path
[docs]def recompute_from_review(review_input, class_col='class_name', layer='detections_review'): """Recompute metrics after a human has edited ``reviewer_label``. The reviewer is expected to fill ``reviewer_label`` with one of: ``'TP'``, ``'FP'``, ``'FN'``, or ``'unclear'`` (case-insensitive). Rows where the label is blank are treated as the automatic status. ``'unclear'`` rows are excluded from the metrics entirely. Parameters ---------- review_input : str, Path, or GeoDataFrame Reviewed GeoPackage path or in-memory GeoDataFrame. class_col : str Column to use for per-class breakdown. layer : str GeoPackage layer if reading from disk. Returns ------- dict Per-class and overall counts/precision/recall/F1 after incorporating human review. """ if isinstance(review_input, (str, Path)): gdf = gpd.read_file(review_input, layer=layer) else: gdf = review_input.copy() label_col = 'reviewer_label' if label_col not in gdf.columns: raise KeyError( f"GeoDataFrame missing '{label_col}' column." ) final = gdf['status'].astype(str).str.upper().copy() reviewer = gdf[label_col].astype(str).str.strip().str.upper() has_review = reviewer.isin(['TP', 'FP', 'FN', 'UNCLEAR']) final[has_review] = reviewer[has_review] # Collapse FP_class into FP for top-line counts final = final.replace({'FP_CLASS': 'FP'}) gdf = gdf.assign(_final=final) gdf = gdf[gdf['_final'] != 'UNCLEAR'] # Decide which class column to bucket by for FN rows def _row_class(row): if row['_final'] == 'FN': return row.get('truth_class') or row.get(class_col) return row.get(class_col) or row.get('truth_class') gdf['_bucket_class'] = gdf.apply(_row_class, axis=1) rows = [] classes = sorted(c for c in gdf['_bucket_class'].dropna().unique()) for cls in classes: sub = gdf[gdf['_bucket_class'] == cls] tp = int((sub['_final'] == 'TP').sum()) fp = int((sub['_final'] == 'FP').sum()) fn = int((sub['_final'] == 'FN').sum()) p = tp / max(tp + fp, 1) r = tp / max(tp + fn, 1) f1 = 2 * p * r / max(p + r, 1e-12) rows.append({ 'class': cls, 'tp': tp, 'fp': fp, 'fn': fn, 'precision': p, 'recall': r, 'f1': f1, }) metrics_df = pd.DataFrame(rows).set_index('class') overall = { 'tp': int(metrics_df['tp'].sum()), 'fp': int(metrics_df['fp'].sum()), 'fn': int(metrics_df['fn'].sum()), } overall['precision'] = overall['tp'] / max( overall['tp'] + overall['fp'], 1 ) overall['recall'] = overall['tp'] / max( overall['tp'] + overall['fn'], 1 ) overall['f1'] = 2 * overall['precision'] * overall['recall'] / max( overall['precision'] + overall['recall'], 1e-12 ) return {'metrics': metrics_df, 'overall': overall}
[docs]def plot_detections(src, predictions=None, truth=None, ax=None, band_indices=None, scale=None, max_features=2000): """Plot a raster with detections color-coded by TP / FP / FN. Parameters ---------- src : xarray.DataArray Background raster opened with ``gw.open()``. predictions : geopandas.GeoDataFrame, optional Detector output. Plotted in solid lines. truth : geopandas.GeoDataFrame, optional Ground truth. Plotted as dashed lines. ax : matplotlib.axes.Axes, optional Existing axes. A new figure is created if None. band_indices : list of int, optional RGB bands for the background. scale : tuple of (lo, hi), optional Stretch range for background display. max_features : int Subsample to this many features before plotting to keep the figure responsive on large outputs. Default 2000. Returns ------- matplotlib.axes.Axes """ import matplotlib.pyplot as plt from ..ml._labels import resolve_band_indices if ax is None: _, ax = plt.subplots(figsize=(10, 10)) # Use geowombat's native imshow so band/y/x → y/x/band reorder and # the geographic extent on the axes happen automatically. indices = resolve_band_indices(src, band_indices) rgb_src = src.isel(band=indices) imshow_kwargs = {'ax': ax} if scale is not None: lo, hi = scale imshow_kwargs.update(vmin=lo, vmax=hi) else: imshow_kwargs['robust'] = True rgb_src.gw.imshow(**imshow_kwargs) def _maybe_sample(gdf): if gdf is not None and len(gdf) > max_features: return gdf.sample(max_features, random_state=0) return gdf predictions = _maybe_sample(predictions) truth = _maybe_sample(truth) if truth is not None and not truth.empty: truth.boundary.plot( ax=ax, color='yellow', linestyle='--', linewidth=1.0, label='Truth', ) if predictions is not None and not predictions.empty: if 'status' in predictions.columns: colors = { 'TP': 'lime', 'FP': 'red', 'FP_class': 'orange', 'FN': 'magenta', } for status, color in colors.items(): sub = predictions[predictions['status'] == status] if sub.empty: continue sub.boundary.plot( ax=ax, color=color, linewidth=1.2, label=status, ) else: predictions.boundary.plot( ax=ax, color='cyan', linewidth=1.0, label='Predictions', ) ax.legend(loc='upper right', fontsize=9) ax.set_xlabel('X') ax.set_ylabel('Y') return ax