1use std::collections::HashMap;
9use std::process::{Command, Stdio};
10
11use anyhow::{Context, Result};
12use bootc_utils::AsyncCommandRunExt;
13use cap_std_ext::cap_std::fs::Dir;
14use cap_std_ext::cmdext::{CapStdExtCommandExt, CmdFds};
15use fn_error_context::context;
16use futures_util::StreamExt;
17use http_body_util::BodyExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use tokio::io::AsyncBufReadExt;
20
21const LIBPOD_API_VERSION: &str = "v5.0.0";
23
24#[derive(Debug, serde::Deserialize)]
26#[allow(dead_code)]
27struct ImagePullReport {
28 #[serde(default)]
29 status: Option<String>,
30 #[serde(default)]
31 stream: Option<String>,
32 #[serde(default)]
33 error: Option<String>,
34 #[serde(default)]
35 images: Option<Vec<String>>,
36 #[serde(default)]
37 id: Option<String>,
38 #[serde(rename = "pullProgress", default)]
39 pull_progress: Option<ArtifactPullProgress>,
40}
41
42#[derive(Debug, serde::Deserialize)]
44struct ArtifactPullProgress {
45 #[serde(default)]
46 status: Option<String>,
47 #[serde(default)]
48 current: u64,
49 #[serde(default)]
50 total: i64,
51 #[serde(rename = "progressComponentID", default)]
52 progress_component_id: String,
53}
54
55pub(crate) struct PodmanClient {
58 service_child: tokio::process::Child,
59 socket_path: String,
61 sysroot: Dir,
64 storage_root: Dir,
65 run_root: Dir,
66}
67
68impl PodmanClient {
69 #[context("Connecting to podman API")]
96 pub(crate) async fn connect(sysroot: &Dir, storage_root: &Dir, run_root: &Dir) -> Result<Self> {
97 use crate::podstorage::STORAGE_ALIAS_DIR;
98
99 let socket_path = "/run/bootc/podman-api.sock".to_owned();
100 std::fs::create_dir_all("/run/bootc/").ok();
101 let _ = std::fs::remove_file(&socket_path);
102
103 let mut cmd = std::process::Command::new(bootc_utils::podman_bin());
104 let mut fds = CmdFds::new();
105 crate::podstorage::bind_storage_roots(&mut cmd, &mut fds, storage_root, run_root)?;
106 crate::podstorage::setup_auth(&mut cmd, &mut fds, sysroot)?;
107
108 let run_root_arg = format!("/proc/self/fd/{}", crate::podstorage::STORAGE_RUN_FD);
109 let socket_uri = format!("unix://{socket_path}");
110 cmd.args([
111 "--root",
112 STORAGE_ALIAS_DIR,
113 "--runroot",
114 &run_root_arg,
115 "system",
116 "service",
117 "--time=0",
118 &socket_uri,
119 ]);
120 cmd.stdin(Stdio::null());
121 cmd.stdout(Stdio::null());
122 cmd.stderr(Stdio::piped());
123 cmd.take_fds(fds);
124
125 tracing::debug!("Starting podman API service at {socket_path}");
126 let mut child = tokio::process::Command::from(cmd)
127 .spawn()
128 .context("Spawning podman system service")?;
129
130 for _ in 0..900 {
133 if let Some(status) = child.try_wait()? {
134 let mut stderr_msg = String::new();
135 if let Some(mut stderr) = child.stderr.take() {
136 use tokio::io::AsyncReadExt;
137 stderr.read_to_string(&mut stderr_msg).await.ok();
138 }
139 anyhow::bail!("Podman API service exited with {status}: {stderr_msg}");
140 }
141 if std::path::Path::new(&socket_path).exists() {
142 break;
143 }
144 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
145 }
146 if !std::path::Path::new(&socket_path).exists() {
147 anyhow::bail!("Podman API socket did not appear at {socket_path}");
148 }
149
150 Ok(Self {
151 service_child: child,
152 socket_path,
153 sysroot: sysroot.try_clone().context("Cloning sysroot")?,
154 storage_root: storage_root.try_clone().context("Cloning storage root")?,
155 run_root: run_root.try_clone().context("Cloning run root")?,
156 })
157 }
158
159 #[context("Pulling image via podman API: {image}")]
175 pub(crate) async fn pull_with_progress(&self, image: &str) -> Result<()> {
176 if uses_non_docker_transport(image) {
177 return self.pull_via_subprocess(image).await;
178 }
179 self.pull_via_api(image).await
180 }
181
182 async fn pull_via_api(&self, image: &str) -> Result<()> {
184 let stream = tokio::net::UnixStream::connect(&self.socket_path)
185 .await
186 .context("Connecting to podman API socket")?;
187 let io = hyper_util::rt::TokioIo::new(stream);
188
189 let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
190 .await
191 .context("HTTP/1.1 handshake with podman")?;
192
193 tokio::spawn(async move {
194 if let Err(e) = conn.await {
195 tracing::warn!("Podman HTTP connection error: {e}");
196 }
197 });
198
199 let encoded_ref =
200 percent_encoding::utf8_percent_encode(image, percent_encoding::NON_ALPHANUMERIC);
201 let uri = format!(
202 "/{LIBPOD_API_VERSION}/libpod/images/pull?reference={encoded_ref}&pullProgress=true&policy=always"
203 );
204
205 tracing::debug!("POST {uri}");
206 let response = sender
207 .send_request(
208 hyper::Request::builder()
209 .method(hyper::Method::POST)
210 .uri(&uri)
211 .header(hyper::header::HOST, "d")
212 .body(http_body_util::Empty::<hyper::body::Bytes>::new())
213 .context("Building pull request")?,
214 )
215 .await
216 .context("Sending pull request to podman")?;
217
218 let status = response.status();
219 if !status.is_success() {
220 let body = response
221 .into_body()
222 .collect()
223 .await
224 .context("Reading error response body")?
225 .to_bytes();
226 anyhow::bail!(
227 "Podman libpod pull failed with HTTP {status}: {}",
228 String::from_utf8_lossy(&body)
229 );
230 }
231
232 let body_stream =
234 http_body_util::BodyStream::new(response.into_body()).filter_map(|r| async {
235 match r {
236 Ok(frame) => frame.into_data().ok().map(|b| Ok::<_, std::io::Error>(b)),
237 Err(e) => Some(Err(std::io::Error::other(e))),
238 }
239 });
240 let reader = tokio_util::io::StreamReader::new(body_stream);
241 let mut reader = Box::pin(tokio::io::BufReader::new(reader));
242 display_pull_progress(&mut reader).await
243 }
244
245 async fn pull_via_subprocess(&self, image: &str) -> Result<()> {
252 tracing::debug!(
253 "Image uses non-docker transport, falling back to podman pull subprocess: {image}"
254 );
255 let mut cmd = Command::new(bootc_utils::podman_bin());
256 let mut fds = CmdFds::new();
257 crate::podstorage::bind_storage_roots(
258 &mut cmd,
259 &mut fds,
260 &self.storage_root,
261 &self.run_root,
262 )?;
263 crate::podstorage::setup_auth(&mut cmd, &mut fds, &self.sysroot)?;
264
265 let run_root_arg = format!("/proc/self/fd/{}", crate::podstorage::STORAGE_RUN_FD);
266 cmd.args([
267 "--root",
268 crate::podstorage::STORAGE_ALIAS_DIR,
269 "--runroot",
270 &run_root_arg,
271 "pull",
272 image,
273 ]);
274 cmd.stdin(Stdio::null());
275 cmd.take_fds(fds);
276
277 let mut cmd = tokio::process::Command::from(cmd);
278 cmd.run()
279 .await
280 .context("Pulling image via podman subprocess")?;
281 Ok(())
282 }
283}
284
285const NON_DOCKER_TRANSPORTS: &[&str] = &[
291 "oci:",
292 "oci-archive:",
293 "dir:",
294 "docker-archive:",
295 "docker-daemon:",
296 "containers-storage:",
297];
298
299fn uses_non_docker_transport(image: &str) -> bool {
305 NON_DOCKER_TRANSPORTS
306 .iter()
307 .any(|prefix| image.starts_with(prefix))
308}
309
310async fn display_pull_progress(reader: &mut (impl AsyncBufReadExt + Unpin)) -> Result<()> {
318 let mp = MultiProgress::new();
319 let mut blob_bars: HashMap<String, ProgressBar> = HashMap::new();
320 let mut have_pull_progress = false;
321
322 let download_style = ProgressStyle::default_bar()
323 .template(
324 "{prefix:.bold} [{bar:30}] {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec})",
325 )
326 .expect("valid template")
327 .progress_chars("=> ");
328
329 let spinner_style = ProgressStyle::default_spinner()
330 .template("{spinner} {msg}")
331 .expect("valid template");
332
333 let status_bar = mp.add(ProgressBar::new_spinner());
336 status_bar.set_style(spinner_style.clone());
337 status_bar.enable_steady_tick(std::time::Duration::from_millis(100));
338
339 let mut line = String::new();
340 loop {
341 line.clear();
342 let n = reader
343 .read_line(&mut line)
344 .await
345 .context("Reading NDJSON line")?;
346 if n == 0 {
347 break;
348 }
349 let trimmed = line.trim();
350 if trimmed.is_empty() {
351 continue;
352 }
353 let report: ImagePullReport = serde_json::from_str(trimmed)
354 .with_context(|| format!("Parsing pull report: {trimmed}"))?;
355
356 if let Some(ref err) = report.error {
357 status_bar.finish_and_clear();
358 anyhow::bail!("Pull error from podman: {err}");
359 }
360
361 if let Some(ref stream_msg) = report.stream {
363 let msg = stream_msg.trim();
364 if !msg.is_empty() {
365 status_bar.set_message(msg.to_owned());
366 }
367 }
368
369 if let Some(ref progress) = report.pull_progress {
371 let blob_id = &progress.progress_component_id;
372 if blob_id.is_empty() {
373 continue;
374 }
375
376 if !have_pull_progress {
379 have_pull_progress = true;
380 status_bar.finish_and_clear();
381 }
382
383 let short_id = ostree_ext::oci_spec::image::Digest::try_from(blob_id.as_str())
384 .map(|d| d.digest().to_owned())
385 .unwrap_or_else(|_| blob_id.clone());
386 let display_id: String = short_id.chars().take(12).collect();
387
388 match progress.status.as_deref().unwrap_or("") {
389 "pulling" => {
390 let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
391 let total = if progress.total > 0 {
392 progress.total as u64
393 } else {
394 0
395 };
396 let pb = mp.add(ProgressBar::new(total));
397 pb.set_style(download_style.clone());
398 pb.set_prefix(display_id.clone());
399 pb
400 });
401 if progress.total > 0 {
402 let new_total = progress.total as u64;
403 if bar.length() != Some(new_total) {
404 bar.set_length(new_total);
405 }
406 }
407 bar.set_position(progress.current);
408 }
409 "success" => {
410 let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
411 let pb = mp.add(ProgressBar::new(0));
412 pb.set_prefix(display_id.clone());
413 pb
414 });
415 bar.set_style(spinner_style.clone());
416 bar.set_message("done");
417 bar.finish();
418 }
419 "skipped" => {
420 let bar = blob_bars.entry(blob_id.to_owned()).or_insert_with(|| {
421 let pb = mp.add(ProgressBar::new(0));
422 pb.set_prefix(display_id.clone());
423 pb
424 });
425 bar.set_style(spinner_style.clone());
426 bar.set_message("Already exists");
427 bar.finish();
428 }
429 _ => {}
430 }
431 }
432 }
433
434 for bar in blob_bars.values() {
436 if !bar.is_finished() {
437 bar.finish_and_clear();
438 }
439 }
440 if !status_bar.is_finished() {
441 status_bar.finish_and_clear();
442 }
443
444 Ok(())
445}
446
447impl Drop for PodmanClient {
448 fn drop(&mut self) {
449 let _ = self.service_child.start_kill();
450 let _ = std::fs::remove_file(&self.socket_path);
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_deserialize_pull_report_progress() {
460 let json = r#"{"status":"pulling","pullProgress":{"status":"pulling","current":12345,"total":98765,"progressComponentID":"sha256:abc123"}}"#;
461 let report: ImagePullReport = serde_json::from_str(json).unwrap();
462 assert_eq!(report.status.as_deref(), Some("pulling"));
463 let progress = report.pull_progress.unwrap();
464 assert_eq!(progress.status.as_deref(), Some("pulling"));
465 assert_eq!(progress.current, 12345);
466 assert_eq!(progress.total, 98765);
467 assert_eq!(progress.progress_component_id, "sha256:abc123");
468 }
469
470 #[test]
471 fn test_deserialize_pull_report_success() {
472 let json = r#"{"status":"success","images":["sha256:fullid"],"id":"sha256:fullid"}"#;
473 let report: ImagePullReport = serde_json::from_str(json).unwrap();
474 assert_eq!(report.status.as_deref(), Some("success"));
475 assert_eq!(report.id.as_deref(), Some("sha256:fullid"));
476 assert_eq!(
477 report.images.as_deref(),
478 Some(&["sha256:fullid".to_owned()][..])
479 );
480 }
481
482 #[test]
483 fn test_deserialize_pull_report_skipped() {
484 let json = r#"{"status":"pulling","pullProgress":{"status":"skipped","progressComponentID":"sha256:def456"}}"#;
485 let report: ImagePullReport = serde_json::from_str(json).unwrap();
486 let progress = report.pull_progress.unwrap();
487 assert_eq!(progress.status.as_deref(), Some("skipped"));
488 assert_eq!(progress.progress_component_id, "sha256:def456");
489 assert_eq!(progress.current, 0);
490 assert_eq!(progress.total, 0);
491 }
492
493 #[test]
494 fn test_deserialize_pull_report_error() {
495 let json = r#"{"error":"something went wrong"}"#;
496 let report: ImagePullReport = serde_json::from_str(json).unwrap();
497 assert_eq!(report.error.as_deref(), Some("something went wrong"));
498 }
499
500 #[test]
501 fn test_uses_non_docker_transport() {
502 assert!(uses_non_docker_transport("oci:/var/tmp/bootc-oci"));
504 assert!(uses_non_docker_transport("oci-archive:/tmp/image.tar"));
505 assert!(uses_non_docker_transport("dir:/tmp/image-dir"));
506 assert!(uses_non_docker_transport("docker-archive:/tmp/image.tar"));
507 assert!(uses_non_docker_transport(
508 "docker-daemon:localhost/img:latest"
509 ));
510 assert!(uses_non_docker_transport(
511 "containers-storage:localhost/bootc"
512 ));
513
514 assert!(!uses_non_docker_transport("quay.io/example/foo:latest"));
516 assert!(!uses_non_docker_transport("docker.io/library/nginx:latest"));
517 assert!(!uses_non_docker_transport("localhost:5000/myimage:v1"));
518 assert!(!uses_non_docker_transport("registry.example.com/img"));
519 }
520}