---
title: "Resumable Streaming Jobs"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Resumable Streaming Jobs}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

`bigKNN` includes checkpointed streaming APIs for exact jobs that are large
enough, long enough, or operationally important enough that restarting from
scratch would be painful. These jobs stream results into destination
`big.matrix` objects and record their progress in an `.rds` checkpoint file that
can be resumed later.

This vignette covers:

- directed kNN graph streaming with `knn_graph_stream_bigmatrix()`
- checkpointed radius streaming with `radius_stream_job_bigmatrix()`
- resuming interrupted work with `resume_knn_job()`
- what the destination and checkpoint contracts look like in practice

```{r setup, include=FALSE}
knitr::opts_chunk$set(collapse = TRUE, comment = "#>")

if (!requireNamespace("bigmemory", quietly = TRUE)) {
  cat("This vignette requires the 'bigmemory' package.\n")
  knitr::knit_exit()
}

library(bigKNN)
library(bigmemory)
```

```{r helpers, include=FALSE}
job_summary <- function(job) {
  data.frame(
    status = job$status,
    metric = job$metric,
    queries = job$n_query,
    edges = job$n_edge,
    checkpoint = basename(job$checkpoint_path),
    row.names = NULL
  )
}

checkpoint_summary <- function(spec, fields) {
  data.frame(
    field = fields,
    value = vapply(fields, function(field) {
      value <- spec[[field]]
      if (is.null(value)) {
        "NULL"
      } else {
        paste(value, collapse = ", ")
      }
    }, character(1)),
    row.names = NULL
  )
}

read_graph_store <- function(xp_from, xp_to, xp_value = NULL) {
  out <- data.frame(
    from = as.integer(as.vector(bigmemory::as.matrix(xp_from))),
    to = as.integer(as.vector(bigmemory::as.matrix(xp_to))),
    row.names = NULL
  )
  if (!is.null(xp_value)) {
    out$value <- as.numeric(as.vector(bigmemory::as.matrix(xp_value)))
  }
  out
}

read_radius_store <- function(xp_index, xp_distance, xp_offset, query_ids) {
  index <- as.integer(as.vector(bigmemory::as.matrix(xp_index)))
  distance <- as.numeric(as.vector(bigmemory::as.matrix(xp_distance)))
  offset <- as.integer(as.vector(bigmemory::as.matrix(xp_offset)))
  counts <- diff(offset)

  data.frame(
    query = rep(query_ids, times = counts),
    neighbor = index,
    distance = signif(distance, 5),
    row.names = NULL
  )
}
```

# When resumable jobs are worth using

Resumable jobs are especially useful when:

- the result is too large to keep comfortably in ordinary R objects
- the exact job may run long enough that interruptions are realistic
- you want durable progress on disk rather than all-or-nothing execution
- the destination should already live in a file-backed `big.matrix`

These APIs are about operational safety, not approximate computation. The
results are still exact. The difference is that progress is streamed into
destination matrices and checkpointed along the way.

# Build a Durable Working Directory

To make resume realistic, this vignette uses file-backed references and
file-backed destinations. That mirrors the usual production setup, where the
reference, destination stores, and checkpoint file all live in a stable working
directory.

```{r create-workspace}
scratch_dir <- file.path(tempdir(), "bigknn-resumable-jobs")
dir.create(scratch_dir, recursive = TRUE, showWarnings = FALSE)
```

# Checkpointed graph streaming with `knn_graph_stream_bigmatrix()`

The graph streaming API writes a directed exact kNN graph. In other words, it
streams the same edge set you would get from
`knn_graph_bigmatrix(..., symmetrize = "none")`.

For a directed graph job, the destination size is known in advance:

`n_edge = n_ref * k`

Here is a small file-backed reference with four rows and one outgoing edge per
row.

```{r graph-reference}
graph_points <- data.frame(
  id = paste0("g", 1:4),
  x1 = c(0, 1, 5, 6),
  x2 = c(0, 0, 0, 0)
)

graph_ref <- filebacked.big.matrix(
  nrow = nrow(graph_points),
  ncol = 2,
  type = "double",
  backingfile = "graph-ref.bin",
  descriptorfile = "graph-ref.desc",
  backingpath = scratch_dir
)

graph_ref[,] <- as.matrix(graph_points[c("x1", "x2")])
graph_points
```

```{r graph-job}
k <- 1L
n_edge <- nrow(graph_ref) * k

graph_from <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-from.bin",
  descriptorfile = "graph-from.desc",
  backingpath = scratch_dir
)

graph_to <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-to.bin",
  descriptorfile = "graph-to.desc",
  backingpath = scratch_dir
)

graph_value <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "double",
  backingfile = "graph-value.bin",
  descriptorfile = "graph-value.desc",
  backingpath = scratch_dir
)

graph_checkpoint <- file.path(scratch_dir, "graph-job.rds")

graph_job <- knn_graph_stream_bigmatrix(
  graph_ref,
  k = k,
  xpFrom = graph_from,
  xpTo = graph_to,
  xpValue = graph_value,
  checkpoint_path = graph_checkpoint
)

job_summary(graph_job)
read_graph_store(graph_from, graph_to, graph_value)
```

Because the output is written directly to the destination matrices, the returned
job object is mostly a status record. The actual graph data lives in
`graph_from`, `graph_to`, and `graph_value`.

# Checkpointed radius streaming with `radius_stream_job_bigmatrix()`

Radius output is more complicated because the total number of matches is not
known from `k` alone. In practice, the easiest pattern is:

1. run `count_within_radius_bigmatrix()` to size the destinations
2. allocate file-backed stores of the right length
3. run `radius_stream_job_bigmatrix()`

The resumable radius job itself still has an internal count phase, so its
checkpoint knows the per-query counts and can rebuild the offset vector during
resume.

```{r radius-reference}
radius_points <- data.frame(
  id = paste0("r", 1:4),
  x1 = c(1, 0, 1, 2),
  x2 = c(0, 1, 1, 1)
)

radius_ref <- filebacked.big.matrix(
  nrow = nrow(radius_points),
  ncol = 2,
  type = "double",
  backingfile = "radius-ref.bin",
  descriptorfile = "radius-ref.desc",
  backingpath = scratch_dir
)

radius_ref[,] <- as.matrix(radius_points[c("x1", "x2")])
radius_points
```

```{r radius-job}
radius_counts <- count_within_radius_bigmatrix(radius_ref, radius = 1.1)
total_matches <- sum(radius_counts)

radius_index <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "integer",
  backingfile = "radius-index.bin",
  descriptorfile = "radius-index.desc",
  backingpath = scratch_dir
)

radius_distance <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "double",
  backingfile = "radius-distance.bin",
  descriptorfile = "radius-distance.desc",
  backingpath = scratch_dir
)

radius_offset <- filebacked.big.matrix(
  nrow = length(radius_counts) + 1L,
  ncol = 1,
  type = "double",
  backingfile = "radius-offset.bin",
  descriptorfile = "radius-offset.desc",
  backingpath = scratch_dir
)

radius_checkpoint <- file.path(scratch_dir, "radius-job.rds")

radius_job <- radius_stream_job_bigmatrix(
  radius_ref,
  xpIndex = radius_index,
  xpDistance = radius_distance,
  xpOffset = radius_offset,
  radius = 1.1,
  checkpoint_path = radius_checkpoint
)

radius_counts
job_summary(radius_job)
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
```

The flattened `index` and `distance` vectors are interpreted through
`xpOffset`. For query `i`, the relevant slice runs from `offset[i]` to
`offset[i + 1] - 1`.

# Restarting work with `resume_knn_job()`

In a real interruption, the checkpoint would already be left in a partially
completed state by the interrupted session. Inside a vignette build we obviously
cannot crash the R process on purpose, so the next chunks mimic that situation
by rewinding the checkpoint and clearing only the unfinished part of the output.

## Graph Job Resume

```{r resume-graph}
graph_expected <- as.data.frame(
  knn_graph_bigmatrix(graph_ref, k = 1, format = "edge_list", symmetrize = "none")
)
attr(graph_expected, "bigknn_graph") <- NULL

graph_spec <- readRDS(graph_checkpoint)
graph_spec$status <- "running"
graph_spec$next_row <- 3L
graph_spec$next_edge <- 3L
saveRDS(graph_spec, graph_checkpoint)

graph_from[, 1] <- 0L
graph_to[, 1] <- 0L
graph_value[, 1] <- 0
graph_from[1:2, 1] <- as.integer(graph_expected$from[1:2])
graph_to[1:2, 1] <- as.integer(graph_expected$to[1:2])
graph_value[1:2, 1] <- graph_expected$distance[1:2]

resumed_graph_job <- resume_knn_job(graph_checkpoint)

job_summary(resumed_graph_job)
read_graph_store(graph_from, graph_to, graph_value)
```

The resumed job finishes the remaining rows and rewrites the missing tail of the
streamed edge list.

## Radius Job Resume

Radius jobs checkpoint both the current phase and the next row cursor. Here we
simulate a restart after the count phase has already completed but the collect
phase stopped partway through.

```{r resume-radius}
radius_expected <- radius_bigmatrix(radius_ref, radius = 1.1)

radius_spec <- readRDS(radius_checkpoint)
radius_spec$status <- "running"
radius_spec$phase <- "collect"
radius_spec$next_row <- 3L
saveRDS(radius_spec, radius_checkpoint)

radius_index[, 1] <- 0L
radius_distance[, 1] <- 0
radius_offset[, 1] <- 0

prefix_end <- radius_expected$offset[3L] - 1L
radius_index[seq_len(prefix_end), 1] <- as.integer(radius_expected$index[seq_len(prefix_end)])
radius_distance[seq_len(prefix_end), 1] <- radius_expected$distance[seq_len(prefix_end)]

resumed_radius_job <- resume_knn_job(radius_checkpoint)

job_summary(resumed_radius_job)
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
```

Notice that the offset vector is restored as part of the resumed run. That is
why the radius checkpoint tracks both the phase and the per-query counts.

# Destination matrix sizing and storage types

The destination contracts are strict, and it helps to keep them explicit:

- For `knn_graph_stream_bigmatrix()`, `xpFrom` and `xpTo` must be writable
  single-column `big.matrix` objects with `n_ref * k` rows.
- For graph jobs, `xpValue` is optional, but when supplied it must be a
  writable single-column double-backed `big.matrix`.
- For `radius_stream_job_bigmatrix()`, `xpIndex` must be writable, single
  column, and long enough for the flattened match vector.
- For radius jobs, `xpDistance` must be double-backed when supplied.
- For radius jobs, `xpOffset` must be single-column and have `n_query + 1`
  rows.

In practice, file-backed destinations are often the most natural choice because
they match the persistence model of checkpointed jobs.

# What checkpoint files contain

Checkpoint files are ordinary `.rds` files, but they are not meant to be edited
in normal use. At a high level they store:

- prepared-reference metadata
- query metadata
- descriptors for the destination matrices
- the current phase and row cursor
- enough state to finish the remaining work

Here are the most informative scalar fields for the graph and radius checkpoints
created above:

```{r checkpoint-fields}
graph_spec <- readRDS(graph_checkpoint)
radius_spec <- readRDS(radius_checkpoint)

checkpoint_summary(graph_spec, c("type", "status", "k", "next_row", "next_edge"))
checkpoint_summary(radius_spec, c("type", "status", "phase", "next_row", "total_matches"))
```

The non-scalar fields in those checkpoint objects hold the serialized prepared
reference, query description, and destination descriptors needed for resume.

# Failure recovery and restart patterns

A few practical patterns make checkpointed workflows more reliable:

- Keep the checkpoint file, the reference backing files, and the destination
  backing files together inside a stable workflow directory.
- After an interruption, call `resume_knn_job()` with the same checkpoint path
  rather than starting from scratch.
- If a checkpoint already shows `status = "completed"`, calling
  `resume_knn_job()` again simply returns a completed job object.
- If the backing files for the reference or destinations are missing, resume
  cannot reattach the descriptors and the job cannot continue.

The key idea is that the checkpoint is only one part of the state. The backing
files for the referenced `big.matrix` objects matter just as much.

# Operational tips for long-running jobs

- Pair resumable jobs with `knn_plan_bigmatrix()` when you want a repeatable
  block-size and thread policy across runs.
- Prefer file-backed destinations for long jobs, especially when the outputs are
  large enough that you would not want to recompute them.
- Put checkpoint files in a predictable location so restart logic can find them
  easily.
- Use graph streaming when the directed edge list is the real output, and use
  radius streaming when you want flattened match vectors plus offsets.
- Treat checkpoint files as internal state. Reading them for diagnostics is
  fine; hand-editing them should be reserved for controlled recovery scenarios
  like the simulated example in this vignette.

Used this way, resumable jobs give `bigKNN` a more production-friendly exact
workflow: results are streamed to durable storage, progress survives
interruptions, and recovery is a normal part of the API instead of an external
hack.
