Skip to main content

bootc_lib/
podman_client.rs

1//! Async podman client using the native libpod API.
2//!
3//! Provides a high-level interface for pulling container images through
4//! podman's native libpod HTTP API, enabling streaming per-blob byte-level
5//! progress display. The transient `podman system service` is started
6//! against bootc's custom storage root and automatically torn down.
7
8use std::collections::HashMap;
9use std::process::{Command, Stdio};
10
11use anyhow::{Context, Result};
12use bootc_utils::AsyncCommandRunExt;
13use cap_std_ext::cap_std::fs::Dir;
14use cap_std_ext::cmdext::{CapStdExtCommandExt, CmdFds};
15use fn_error_context::context;
16use futures_util::StreamExt;
17use http_body_util::BodyExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use tokio::io::AsyncBufReadExt;
20
21/// Podman libpod API version to use.
22const LIBPOD_API_VERSION: &str = "v5.0.0";
23
24/// A report object from podman's native image pull endpoint.
25#[derive(Debug, serde::Deserialize)]
26#[allow(dead_code)]
27struct ImagePullReport {
28    #[serde(default)]
29    status: Option<String>,
30    #[serde(default)]
31    stream: Option<String>,
32    #[serde(default)]
33    error: Option<String>,
34    #[serde(default)]
35    images: Option<Vec<String>>,
36    #[serde(default)]
37    id: Option<String>,
38    #[serde(rename = "pullProgress", default)]
39    pull_progress: Option<ArtifactPullProgress>,
40}
41
42/// Per-blob download progress from podman's native pull API.
43#[derive(Debug, serde::Deserialize)]
44struct ArtifactPullProgress {
45    #[serde(default)]
46    status: Option<String>,
47    #[serde(default)]
48    current: u64,
49    #[serde(default)]
50    total: i64,
51    #[serde(rename = "progressComponentID", default)]
52    progress_component_id: String,
53}
54
55/// Manages a transient podman service, providing HTTP access to the
56/// native libpod API via a Unix socket.
57pub(crate) struct PodmanClient {
58    service_child: tokio::process::Child,
59    /// Filesystem path to the socket.
60    socket_path: String,
61    /// Stored for subprocess fallback when the image transport is not
62    /// supported by the libpod HTTP API (which only handles `docker:`).
63    sysroot: Dir,
64    storage_root: Dir,
65    run_root: Dir,
66}
67
68impl PodmanClient {
69    /// Start a transient `podman system service` pointing at the given
70    /// storage root and connect to it.
71    ///
72    /// Registry auth is configured via `REGISTRY_AUTH_FILE` on the
73    /// podman service process, using the same bootc/ostree auth as
74    /// existing podman CLI invocations.
75    //
76    // TODO: Eliminate the socket-path polling by passing a pre-created
77    // listener via the systemd LISTEN_FDS protocol (podman supports
78    // this when a URI is provided and LISTEN_FDS is set — it calls
79    // `os.NewFile(3)` + `net.FileListener` instead of `net.Listen`).
80    //
81    // The blocker is that `bind_storage_roots()` hardcodes the runroot
82    // fd at STORAGE_RUN_FD=3, which conflicts with LISTEN_FDS's
83    // requirement that the listener be at fd 3.  Podman also stores
84    // the runroot path (`/proc/self/fd/3`) in its on-disk database
85    // (bolt_state.db), so changing the fd number for the API service
86    // causes a "database configuration mismatch" error.
87    //
88    // Fixing this requires either:
89    //   (a) Changing STORAGE_RUN_FD to a higher number globally (breaks
90    //       existing installed systems whose DB has /proc/self/fd/3),
91    //   (b) Using a separate runroot path for the transient API service
92    //       (e.g. /run/bootc/api-run) that doesn't conflict, or
93    //   (c) Upstream podman change to support LISTEN_FDS at arbitrary
94    //       fd numbers (not just fd 3).
95    #[context("Connecting to podman API")]
96    pub(crate) async fn connect(sysroot: &Dir, storage_root: &Dir, run_root: &Dir) -> Result<Self> {
97        use crate::podstorage::STORAGE_ALIAS_DIR;
98
99        let socket_path = "/run/bootc/podman-api.sock".to_owned();
100        std::fs::create_dir_all("/run/bootc/").ok();
101        let _ = std::fs::remove_file(&socket_path);
102
103        let mut cmd = std::process::Command::new(bootc_utils::podman_bin());
104        let mut fds = CmdFds::new();
105        crate::podstorage::bind_storage_roots(&mut cmd, &mut fds, storage_root, run_root)?;
106        crate::podstorage::setup_auth(&mut cmd, &mut fds, sysroot)?;
107
108        let run_root_arg = format!("/proc/self/fd/{}", crate::podstorage::STORAGE_RUN_FD);
109        let socket_uri = format!("unix://{socket_path}");
110        cmd.args([
111            "--root",
112            STORAGE_ALIAS_DIR,
113            "--runroot",
114            &run_root_arg,
115            "system",
116            "service",
117            "--time=0",
118            &socket_uri,
119        ]);
120        cmd.stdin(Stdio::null());
121        cmd.stdout(Stdio::null());
122        cmd.stderr(Stdio::piped());
123        cmd.take_fds(fds);
124
125        tracing::debug!("Starting podman API service at {socket_path}");
126        let mut child = tokio::process::Command::from(cmd)
127            .spawn()
128            .context("Spawning podman system service")?;
129
130        // Poll for the socket to appear, checking for early exit.
131        // 900 * 100ms = 90s, matching the systemd unit startup timeout.
132        for _ in 0..900 {
133            if let Some(status) = child.try_wait()? {
134                let mut stderr_msg = String::new();
135                if let Some(mut stderr) = child.stderr.take() {
136                    use tokio::io::AsyncReadExt;
137                    stderr.read_to_string(&mut stderr_msg).await.ok();
138                }
139                anyhow::bail!("Podman API service exited with {status}: {stderr_msg}");
140            }
141            if std::path::Path::new(&socket_path).exists() {
142                break;
143            }
144            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
145        }
146        if !std::path::Path::new(&socket_path).exists() {
147            anyhow::bail!("Podman API socket did not appear at {socket_path}");
148        }
149
150        Ok(Self {
151            service_child: child,
152            socket_path,
153            sysroot: sysroot.try_clone().context("Cloning sysroot")?,
154            storage_root: storage_root.try_clone().context("Cloning storage root")?,
155            run_root: run_root.try_clone().context("Cloning run root")?,
156        })
157    }
158
159    /// Pull a container image with streaming progress display.
160    ///
161    /// Uses the native podman libpod API (`/libpod/images/pull`) which
162    /// provides real download progress (bytes transferred) on podman
163    /// 5.9+ (see containers/podman#28224). On older podman, status
164    /// messages ("Copying blob ...", "Writing manifest ...") are shown
165    /// as a spinner.
166    ///
167    /// The libpod HTTP API only supports the `docker:` transport.  When
168    /// the image reference uses a different transport (`oci:`, `dir:`,
169    /// `containers-storage:`, etc.) we fall back to invoking `podman pull`
170    /// as a subprocess with the same storage and auth configuration.
171    ///
172    /// Registry authentication is handled by the podman service process
173    /// via `REGISTRY_AUTH_FILE`, configured at connect() time.
174    #[context("Pulling image via podman API: {image}")]
175    pub(crate) async fn pull_with_progress(&self, image: &str) -> Result<()> {
176        if uses_non_docker_transport(image) {
177            return self.pull_via_subprocess(image).await;
178        }
179        self.pull_via_api(image).await
180    }
181
182    /// Pull using the libpod HTTP API (docker transport only).
183    async fn pull_via_api(&self, image: &str) -> Result<()> {
184        let stream = tokio::net::UnixStream::connect(&self.socket_path)
185            .await
186            .context("Connecting to podman API socket")?;
187        let io = hyper_util::rt::TokioIo::new(stream);
188
189        let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
190            .await
191            .context("HTTP/1.1 handshake with podman")?;
192
193        tokio::spawn(async move {
194            if let Err(e) = conn.await {
195                tracing::warn!("Podman HTTP connection error: {e}");
196            }
197        });
198
199        let encoded_ref =
200            percent_encoding::utf8_percent_encode(image, percent_encoding::NON_ALPHANUMERIC);
201        let uri = format!(
202            "/{LIBPOD_API_VERSION}/libpod/images/pull?reference={encoded_ref}&pullProgress=true&policy=always"
203        );
204
205        tracing::debug!("POST {uri}");
206        let response = sender
207            .send_request(
208                hyper::Request::builder()
209                    .method(hyper::Method::POST)
210                    .uri(&uri)
211                    .header(hyper::header::HOST, "d")
212                    .body(http_body_util::Empty::<hyper::body::Bytes>::new())
213                    .context("Building pull request")?,
214            )
215            .await
216            .context("Sending pull request to podman")?;
217
218        let status = response.status();
219        if !status.is_success() {
220            let body = response
221                .into_body()
222                .collect()
223                .await
224                .context("Reading error response body")?
225                .to_bytes();
226            anyhow::bail!(
227                "Podman libpod pull failed with HTTP {status}: {}",
228                String::from_utf8_lossy(&body)
229            );
230        }
231
232        // Turn the HTTP body into an AsyncBufRead so we can use read_line().
233        let body_stream =
234            http_body_util::BodyStream::new(response.into_body()).filter_map(|r| async {
235                match r {
236                    Ok(frame) => frame.into_data().ok().map(|b| Ok::<_, std::io::Error>(b)),
237                    Err(e) => Some(Err(std::io::Error::other(e))),
238                }
239            });
240        let reader = tokio_util::io::StreamReader::new(body_stream);
241        let mut reader = Box::pin(tokio::io::BufReader::new(reader));
242        display_pull_progress(&mut reader).await
243    }
244
245    /// Fallback: pull via `podman pull` subprocess for non-docker transports.
246    ///
247    /// The libpod HTTP API only supports `docker:` transport, so transports
248    /// like `oci:`, `dir:`, `containers-storage:`, etc. are handled by
249    /// shelling out to `podman pull` with the same storage and auth
250    /// configuration used by the API service.
251    async fn pull_via_subprocess(&self, image: &str) -> Result<()> {
252        tracing::debug!(
253            "Image uses non-docker transport, falling back to podman pull subprocess: {image}"
254        );
255        let mut cmd = Command::new(bootc_utils::podman_bin());
256        let mut fds = CmdFds::new();
257        crate::podstorage::bind_storage_roots(
258            &mut cmd,
259            &mut fds,
260            &self.storage_root,
261            &self.run_root,
262        )?;
263        crate::podstorage::setup_auth(&mut cmd, &mut fds, &self.sysroot)?;
264
265        let run_root_arg = format!("/proc/self/fd/{}", crate::podstorage::STORAGE_RUN_FD);
266        cmd.args([
267            "--root",
268            crate::podstorage::STORAGE_ALIAS_DIR,
269            "--runroot",
270            &run_root_arg,
271            "pull",
272            image,
273        ]);
274        cmd.stdin(Stdio::null());
275        cmd.take_fds(fds);
276
277        let mut cmd = tokio::process::Command::from(cmd);
278        cmd.run()
279            .await
280            .context("Pulling image via podman subprocess")?;
281        Ok(())
282    }
283}
284
285/// Non-docker container image transports known to containers/image.
286///
287/// The libpod HTTP API only supports `docker:` (registry) transport.
288/// Any image reference starting with one of these prefixes followed by `:`
289/// must be pulled via the `podman pull` CLI instead.
290const NON_DOCKER_TRANSPORTS: &[&str] = &[
291    "oci:",
292    "oci-archive:",
293    "dir:",
294    "docker-archive:",
295    "docker-daemon:",
296    "containers-storage:",
297];
298
299/// Returns `true` if `image` uses a non-docker transport prefix.
300///
301/// Plain image names (e.g. `quay.io/example/foo:latest`) and explicit
302/// `docker:` references are handled by the libpod HTTP API.  Everything
303/// else needs the subprocess fallback.
304fn uses_non_docker_transport(image: &str) -> bool {
305    NON_DOCKER_TRANSPORTS
306        .iter()
307        .any(|prefix| image.starts_with(prefix))
308}
309
310/// Read NDJSON lines from `reader` and display pull progress.
311///
312/// Handles two modes:
313/// - **Modern podman** (5.9+, with `pullProgress` support): per-blob
314///   byte-level progress bars via indicatif
315/// - **Older podman** (5.x): shows status messages from the `stream` field
316///   ("Copying blob ...", "Writing manifest ...") as a live spinner
317async fn display_pull_progress(reader: &mut (impl AsyncBufReadExt + Unpin)) -> Result<()> {
318    let mp = MultiProgress::new();
319    let mut blob_bars: HashMap<String, ProgressBar> = HashMap::new();
320    let mut have_pull_progress = false;
321
322    let download_style = ProgressStyle::default_bar()
323        .template(
324            "{prefix:.bold} [{bar:30}] {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec})",
325        )
326        .expect("valid template")
327        .progress_chars("=> ");
328
329    let spinner_style = ProgressStyle::default_spinner()
330        .template("{spinner} {msg}")
331        .expect("valid template");
332
333    // A top-level status spinner for stream messages (used on older
334    // podman that doesn't emit pullProgress events).
335    let status_bar = mp.add(ProgressBar::new_spinner());
336    status_bar.set_style(spinner_style.clone());
337    status_bar.enable_steady_tick(std::time::Duration::from_millis(100));
338
339    let mut line = String::new();
340    loop {
341        line.clear();
342        let n = reader
343            .read_line(&mut line)
344            .await
345            .context("Reading NDJSON line")?;
346        if n == 0 {
347            break;
348        }
349        let trimmed = line.trim();
350        if trimmed.is_empty() {
351            continue;
352        }
353        let report: ImagePullReport = serde_json::from_str(trimmed)
354            .with_context(|| format!("Parsing pull report: {trimmed}"))?;
355
356        if let Some(ref err) = report.error {
357            status_bar.finish_and_clear();
358            anyhow::bail!("Pull error from podman: {err}");
359        }
360
361        // Show stream messages ("Copying blob ...", "Writing manifest ...").
362        if let Some(ref stream_msg) = report.stream {
363            let msg = stream_msg.trim();
364            if !msg.is_empty() {
365                status_bar.set_message(msg.to_owned());
366            }
367        }
368
369        // Handle per-blob progress (modern podman with pullProgress=true).
370        if let Some(ref progress) = report.pull_progress {
371            let blob_id = &progress.progress_component_id;
372            if blob_id.is_empty() {
373                continue;
374            }
375
376            // Once we see pullProgress events, hide the status spinner —
377            // the per-blob bars are more informative.
378            if !have_pull_progress {
379                have_pull_progress = true;
380                status_bar.finish_and_clear();
381            }
382
383            let short_id = ostree_ext::oci_spec::image::Digest::try_from(blob_id.as_str())
384                .map(|d| d.digest().to_owned())
385                .unwrap_or_else(|_| blob_id.clone());
386            let display_id: String = short_id.chars().take(12).collect();
387
388            match progress.status.as_deref().unwrap_or("") {
389                "pulling" => {
390                    let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
391                        let total = if progress.total > 0 {
392                            progress.total as u64
393                        } else {
394                            0
395                        };
396                        let pb = mp.add(ProgressBar::new(total));
397                        pb.set_style(download_style.clone());
398                        pb.set_prefix(display_id.clone());
399                        pb
400                    });
401                    if progress.total > 0 {
402                        let new_total = progress.total as u64;
403                        if bar.length() != Some(new_total) {
404                            bar.set_length(new_total);
405                        }
406                    }
407                    bar.set_position(progress.current);
408                }
409                "success" => {
410                    let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
411                        let pb = mp.add(ProgressBar::new(0));
412                        pb.set_prefix(display_id.clone());
413                        pb
414                    });
415                    bar.set_style(spinner_style.clone());
416                    bar.set_message("done");
417                    bar.finish();
418                }
419                "skipped" => {
420                    let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
421                        let pb = mp.add(ProgressBar::new(0));
422                        pb.set_prefix(display_id.clone());
423                        pb
424                    });
425                    bar.set_style(spinner_style.clone());
426                    bar.set_message("Already exists");
427                    bar.finish();
428                }
429                _ => {}
430            }
431        }
432    }
433
434    // Clean up.
435    for bar in blob_bars.values() {
436        if !bar.is_finished() {
437            bar.finish_and_clear();
438        }
439    }
440    if !status_bar.is_finished() {
441        status_bar.finish_and_clear();
442    }
443
444    Ok(())
445}
446
447impl Drop for PodmanClient {
448    fn drop(&mut self) {
449        let _ = self.service_child.start_kill();
450        let _ = std::fs::remove_file(&self.socket_path);
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_deserialize_pull_report_progress() {
460        let json = r#"{"status":"pulling","pullProgress":{"status":"pulling","current":12345,"total":98765,"progressComponentID":"sha256:abc123"}}"#;
461        let report: ImagePullReport = serde_json::from_str(json).unwrap();
462        assert_eq!(report.status.as_deref(), Some("pulling"));
463        let progress = report.pull_progress.unwrap();
464        assert_eq!(progress.status.as_deref(), Some("pulling"));
465        assert_eq!(progress.current, 12345);
466        assert_eq!(progress.total, 98765);
467        assert_eq!(progress.progress_component_id, "sha256:abc123");
468    }
469
470    #[test]
471    fn test_deserialize_pull_report_success() {
472        let json = r#"{"status":"success","images":["sha256:fullid"],"id":"sha256:fullid"}"#;
473        let report: ImagePullReport = serde_json::from_str(json).unwrap();
474        assert_eq!(report.status.as_deref(), Some("success"));
475        assert_eq!(report.id.as_deref(), Some("sha256:fullid"));
476        assert_eq!(
477            report.images.as_deref(),
478            Some(&["sha256:fullid".to_owned()][..])
479        );
480    }
481
482    #[test]
483    fn test_deserialize_pull_report_skipped() {
484        let json = r#"{"status":"pulling","pullProgress":{"status":"skipped","progressComponentID":"sha256:def456"}}"#;
485        let report: ImagePullReport = serde_json::from_str(json).unwrap();
486        let progress = report.pull_progress.unwrap();
487        assert_eq!(progress.status.as_deref(), Some("skipped"));
488        assert_eq!(progress.progress_component_id, "sha256:def456");
489        assert_eq!(progress.current, 0);
490        assert_eq!(progress.total, 0);
491    }
492
493    #[test]
494    fn test_deserialize_pull_report_error() {
495        let json = r#"{"error":"something went wrong"}"#;
496        let report: ImagePullReport = serde_json::from_str(json).unwrap();
497        assert_eq!(report.error.as_deref(), Some("something went wrong"));
498    }
499
500    #[test]
501    fn test_uses_non_docker_transport() {
502        // Non-docker transports should be detected
503        assert!(uses_non_docker_transport("oci:/var/tmp/bootc-oci"));
504        assert!(uses_non_docker_transport("oci-archive:/tmp/image.tar"));
505        assert!(uses_non_docker_transport("dir:/tmp/image-dir"));
506        assert!(uses_non_docker_transport("docker-archive:/tmp/image.tar"));
507        assert!(uses_non_docker_transport(
508            "docker-daemon:localhost/img:latest"
509        ));
510        assert!(uses_non_docker_transport(
511            "containers-storage:localhost/bootc"
512        ));
513
514        // Docker/registry references should NOT be detected
515        assert!(!uses_non_docker_transport("quay.io/example/foo:latest"));
516        assert!(!uses_non_docker_transport("docker.io/library/nginx:latest"));
517        assert!(!uses_non_docker_transport("localhost:5000/myimage:v1"));
518        assert!(!uses_non_docker_transport("registry.example.com/img"));
519    }
520}