hydro_deploy/
terraform.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader};
3#[cfg(unix)]
4use std::os::unix::process::CommandExt;
5use std::process::{Child, ChildStdout, Command};
6use std::sync::{Arc, RwLock};
7
8use anyhow::{Context, Result, bail};
9use async_process::Stdio;
10use serde::{Deserialize, Serialize};
11use tempfile::TempDir;
12
13use super::progress::ProgressTracker;
14
15pub static TERRAFORM_ALPHABET: [char; 16] = [
16    '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f',
17];
18
19/// Keeps track of resources which may need to be cleaned up.
20#[derive(Default)]
21pub struct TerraformPool {
22    counter: u32,
23    active_applies: HashMap<u32, Arc<tokio::sync::RwLock<TerraformApply>>>,
24}
25
26impl TerraformPool {
27    fn create_apply(
28        &mut self,
29        deployment_folder: TempDir,
30    ) -> Result<(u32, Arc<tokio::sync::RwLock<TerraformApply>>)> {
31        let next_counter = self.counter;
32        self.counter += 1;
33
34        let mut apply_command = Command::new("terraform");
35
36        apply_command
37            .current_dir(deployment_folder.path())
38            .arg("apply")
39            .arg("-auto-approve")
40            .arg("-no-color")
41            .arg("-parallelism=128");
42
43        #[cfg(unix)]
44        {
45            apply_command.process_group(0);
46        }
47
48        let spawned_child = apply_command
49            .stdout(Stdio::piped())
50            .stderr(Stdio::piped())
51            .spawn()
52            .context("Failed to spawn `terraform`. Is it installed?")?;
53
54        let spawned_id = spawned_child.id();
55
56        let deployment = Arc::new(tokio::sync::RwLock::new(TerraformApply {
57            child: Some((spawned_id, Arc::new(RwLock::new(spawned_child)))),
58            deployment_folder: Some(deployment_folder),
59        }));
60
61        self.active_applies.insert(next_counter, deployment.clone());
62
63        Ok((next_counter, deployment))
64    }
65
66    fn drop_apply(&mut self, counter: u32) {
67        self.active_applies.remove(&counter);
68    }
69}
70
71impl Drop for TerraformPool {
72    fn drop(&mut self) {
73        for (_, apply) in self.active_applies.drain() {
74            debug_assert_eq!(Arc::strong_count(&apply), 1);
75        }
76    }
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct TerraformBatch {
81    pub terraform: TerraformConfig,
82    #[serde(skip_serializing_if = "HashMap::is_empty")]
83    pub provider: HashMap<String, serde_json::Value>,
84    #[serde(skip_serializing_if = "HashMap::is_empty")]
85    pub data: HashMap<String, HashMap<String, serde_json::Value>>,
86    pub resource: HashMap<String, HashMap<String, serde_json::Value>>,
87    pub output: HashMap<String, TerraformOutput>,
88}
89
90impl Default for TerraformBatch {
91    fn default() -> TerraformBatch {
92        TerraformBatch {
93            terraform: TerraformConfig {
94                required_providers: HashMap::new(),
95            },
96            provider: HashMap::new(),
97            data: HashMap::new(),
98            resource: HashMap::new(),
99            output: HashMap::new(),
100        }
101    }
102}
103
104impl TerraformBatch {
105    pub async fn provision(self, pool: &mut TerraformPool) -> Result<TerraformResult> {
106        // Hack to quiet false-positive `clippy::needless_pass_by_ref_mut` on latest nightlies.
107        // TODO(mingwei): Remove this when it is no longer needed (current date 2023-08-30).
108        // https://github.com/rust-lang/rust-clippy/issues/11380
109        let pool = std::convert::identity(pool);
110
111        if self.terraform.required_providers.is_empty()
112            && self.resource.is_empty()
113            && self.data.is_empty()
114            && self.output.is_empty()
115        {
116            return Ok(TerraformResult {
117                outputs: HashMap::new(),
118                deployment_folder: None,
119            });
120        }
121
122        ProgressTracker::with_group("terraform", Some(1), || async {
123            let dothydro_folder = std::env::current_dir().unwrap().join(".hydro");
124            std::fs::create_dir_all(&dothydro_folder).unwrap();
125            let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();
126
127            std::fs::write(
128                deployment_folder.path().join("main.tf.json"),
129                serde_json::to_string(&self).unwrap(),
130            )
131            .unwrap();
132
133            if !Command::new("terraform")
134                .current_dir(deployment_folder.path())
135                .arg("init")
136                .stdout(Stdio::null())
137                .spawn()
138                .context("Failed to spawn `terraform`. Is it installed?")?
139                .wait()
140                .context("Failed to launch terraform init command")?
141                .success()
142            {
143                bail!("Failed to initialize terraform");
144            }
145
146            let (apply_id, apply) = pool.create_apply(deployment_folder)?;
147
148            let output = ProgressTracker::with_group(
149                "apply",
150                Some(self.resource.values().map(|r| r.len()).sum()),
151                || async { apply.write().await.output().await },
152            )
153            .await;
154            pool.drop_apply(apply_id);
155            output
156        })
157        .await
158    }
159}
160
161struct TerraformApply {
162    child: Option<(u32, Arc<RwLock<Child>>)>,
163    deployment_folder: Option<TempDir>,
164}
165
166async fn display_apply_outputs(stdout: &mut ChildStdout) {
167    let lines = BufReader::new(stdout).lines();
168    let mut waiting_for_result = HashMap::new();
169
170    for line in lines {
171        if let Ok(line) = line {
172            let mut split = line.split(':');
173            if let Some(first) = split.next() {
174                if first.chars().all(|c| c != ' ')
175                    && split.next().is_some()
176                    && split.next().is_none()
177                {
178                    if line.starts_with("Plan:")
179                        || line.starts_with("Outputs:")
180                        || line.contains(": Still creating...")
181                        || line.contains(": Reading...")
182                        || line.contains(": Still reading...")
183                        || line.contains(": Read complete after")
184                    {
185                    } else if line.ends_with(": Creating...") {
186                        let id = line.split(':').next().unwrap().trim().to_string();
187                        let (channel_send, channel_recv) = tokio::sync::oneshot::channel();
188                        waiting_for_result.insert(
189                            id.to_string(),
190                            (
191                                channel_send,
192                                tokio::task::spawn(ProgressTracker::leaf(id, async move {
193                                    // `Err(RecvError)` means send side was dropped due to another error.
194                                    // Ignore here to prevent spurious panic stack traces.
195                                    let _result = channel_recv.await;
196                                })),
197                            ),
198                        );
199                    } else if line.contains(": Creation complete after") {
200                        let id = line.split(':').next().unwrap().trim();
201                        let (sender, to_await) = waiting_for_result.remove(id).unwrap();
202                        let _ = sender.send(());
203                        to_await.await.unwrap();
204                    } else {
205                        panic!("Unexpected from Terraform: {}", line);
206                    }
207                }
208            }
209        } else {
210            break;
211        }
212    }
213}
214
215fn filter_terraform_logs(child: &mut Child) {
216    let lines = BufReader::new(child.stdout.take().unwrap()).lines();
217    for line in lines {
218        if let Ok(line) = line {
219            let mut split = line.split(':');
220            if let Some(first) = split.next() {
221                if first.chars().all(|c| c != ' ')
222                    && split.next().is_some()
223                    && split.next().is_none()
224                {
225                    eprintln!("[terraform] {}", line);
226                }
227            }
228        } else {
229            break;
230        }
231    }
232}
233
234impl TerraformApply {
235    async fn output(&mut self) -> Result<TerraformResult> {
236        let (_, child) = self.child.as_ref().unwrap().clone();
237        let mut stdout = child.write().unwrap().stdout.take().unwrap();
238        let stderr = child.write().unwrap().stderr.take().unwrap();
239
240        let status = tokio::task::spawn_blocking(move || {
241            // it is okay for this thread to keep running even if the future is cancelled
242            child.write().unwrap().wait().unwrap()
243        });
244
245        let display_apply = display_apply_outputs(&mut stdout);
246        let stderr_loop = tokio::task::spawn_blocking(move || {
247            let mut lines = BufReader::new(stderr).lines();
248            while let Some(Ok(line)) = lines.next() {
249                ProgressTracker::println(format!("[terraform] {}", line));
250            }
251        });
252
253        let _ = futures::join!(display_apply, stderr_loop);
254
255        let status = status.await;
256
257        self.child = None;
258
259        if !status.unwrap().success() {
260            bail!("Terraform deployment failed, see `[terraform]` logs above.");
261        }
262
263        let mut output_command = Command::new("terraform");
264        output_command
265            .current_dir(self.deployment_folder.as_ref().unwrap().path())
266            .arg("output")
267            .arg("-json");
268
269        #[cfg(unix)]
270        {
271            output_command.process_group(0);
272        }
273
274        let output = output_command
275            .output()
276            .context("Failed to read Terraform outputs")?;
277
278        Ok(TerraformResult {
279            outputs: serde_json::from_slice(&output.stdout).unwrap(),
280            deployment_folder: self.deployment_folder.take(),
281        })
282    }
283}
284
285fn destroy_deployment(deployment_folder: TempDir) {
286    println!(
287        "Destroying terraform deployment at {}",
288        deployment_folder.path().display()
289    );
290
291    let mut destroy_command = Command::new("terraform");
292    destroy_command
293        .current_dir(deployment_folder.path())
294        .arg("destroy")
295        .arg("-auto-approve")
296        .arg("-no-color")
297        .arg("-parallelism=128")
298        .stdout(Stdio::piped());
299
300    #[cfg(unix)]
301    {
302        destroy_command.process_group(0);
303    }
304
305    let mut destroy_child = destroy_command
306        .spawn()
307        .expect("Failed to spawn terraform destroy command");
308
309    filter_terraform_logs(&mut destroy_child);
310
311    if !destroy_child
312        .wait()
313        .expect("Failed to destroy terraform deployment")
314        .success()
315    {
316        // prevent the folder from being deleted
317        let _ = deployment_folder.into_path();
318        eprintln!("WARNING: failed to destroy terraform deployment");
319    }
320}
321
322impl Drop for TerraformApply {
323    fn drop(&mut self) {
324        if let Some((pid, child)) = self.child.take() {
325            #[cfg(unix)]
326            nix::sys::signal::kill(
327                nix::unistd::Pid::from_raw(pid as i32),
328                nix::sys::signal::Signal::SIGINT,
329            )
330            .unwrap();
331            #[cfg(not(unix))]
332            let _ = pid;
333
334            let mut child_write = child.write().unwrap();
335            if child_write.try_wait().unwrap().is_none() {
336                println!("Waiting for Terraform apply to finish...");
337                child_write.wait().unwrap();
338            }
339        }
340
341        if let Some(deployment_folder) = self.deployment_folder.take() {
342            destroy_deployment(deployment_folder);
343        }
344    }
345}
346
347#[derive(Serialize, Deserialize)]
348pub struct TerraformConfig {
349    pub required_providers: HashMap<String, TerraformProvider>,
350}
351
352#[derive(Serialize, Deserialize)]
353pub struct TerraformProvider {
354    pub source: String,
355    pub version: String,
356}
357
358#[derive(Serialize, Deserialize, Debug)]
359pub struct TerraformOutput {
360    pub value: String,
361}
362
363#[derive(Debug)]
364pub struct TerraformResult {
365    pub outputs: HashMap<String, TerraformOutput>,
366    /// `None` if no deployment was performed
367    pub deployment_folder: Option<TempDir>,
368}
369
370impl Drop for TerraformResult {
371    fn drop(&mut self) {
372        if let Some(deployment_folder) = self.deployment_folder.take() {
373            destroy_deployment(deployment_folder);
374        }
375    }
376}
377
378#[derive(Serialize, Deserialize)]
379pub struct TerraformResultOutput {
380    value: String,
381}