1use std::collections::HashMap;
2use std::io::{BufRead, BufReader};
3#[cfg(unix)]
4use std::os::unix::process::CommandExt;
5use std::process::{Child, ChildStdout, Command};
6use std::sync::{Arc, RwLock};
7
8use anyhow::{Context, Result, bail};
9use async_process::Stdio;
10use serde::{Deserialize, Serialize};
11use tempfile::TempDir;
12
13use super::progress::ProgressTracker;
14
15pub static TERRAFORM_ALPHABET: [char; 16] = [
16 '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f',
17];
18
19#[derive(Default)]
21pub struct TerraformPool {
22 counter: u32,
23 active_applies: HashMap<u32, Arc<tokio::sync::RwLock<TerraformApply>>>,
24}
25
26impl TerraformPool {
27 fn create_apply(
28 &mut self,
29 deployment_folder: TempDir,
30 ) -> Result<(u32, Arc<tokio::sync::RwLock<TerraformApply>>)> {
31 let next_counter = self.counter;
32 self.counter += 1;
33
34 let mut apply_command = Command::new("terraform");
35
36 apply_command
37 .current_dir(deployment_folder.path())
38 .arg("apply")
39 .arg("-auto-approve")
40 .arg("-no-color")
41 .arg("-parallelism=128");
42
43 #[cfg(unix)]
44 {
45 apply_command.process_group(0);
46 }
47
48 let spawned_child = apply_command
49 .stdout(Stdio::piped())
50 .stderr(Stdio::piped())
51 .spawn()
52 .context("Failed to spawn `terraform`. Is it installed?")?;
53
54 let spawned_id = spawned_child.id();
55
56 let deployment = Arc::new(tokio::sync::RwLock::new(TerraformApply {
57 child: Some((spawned_id, Arc::new(RwLock::new(spawned_child)))),
58 deployment_folder: Some(deployment_folder),
59 }));
60
61 self.active_applies.insert(next_counter, deployment.clone());
62
63 Ok((next_counter, deployment))
64 }
65
66 fn drop_apply(&mut self, counter: u32) {
67 self.active_applies.remove(&counter);
68 }
69}
70
71impl Drop for TerraformPool {
72 fn drop(&mut self) {
73 for (_, apply) in self.active_applies.drain() {
74 debug_assert_eq!(Arc::strong_count(&apply), 1);
75 }
76 }
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct TerraformBatch {
81 pub terraform: TerraformConfig,
82 #[serde(skip_serializing_if = "HashMap::is_empty")]
83 pub provider: HashMap<String, serde_json::Value>,
84 #[serde(skip_serializing_if = "HashMap::is_empty")]
85 pub data: HashMap<String, HashMap<String, serde_json::Value>>,
86 pub resource: HashMap<String, HashMap<String, serde_json::Value>>,
87 pub output: HashMap<String, TerraformOutput>,
88}
89
90impl Default for TerraformBatch {
91 fn default() -> TerraformBatch {
92 TerraformBatch {
93 terraform: TerraformConfig {
94 required_providers: HashMap::new(),
95 },
96 provider: HashMap::new(),
97 data: HashMap::new(),
98 resource: HashMap::new(),
99 output: HashMap::new(),
100 }
101 }
102}
103
104impl TerraformBatch {
105 pub async fn provision(self, pool: &mut TerraformPool) -> Result<TerraformResult> {
106 let pool = std::convert::identity(pool);
110
111 if self.terraform.required_providers.is_empty()
112 && self.resource.is_empty()
113 && self.data.is_empty()
114 && self.output.is_empty()
115 {
116 return Ok(TerraformResult {
117 outputs: HashMap::new(),
118 deployment_folder: None,
119 });
120 }
121
122 ProgressTracker::with_group("terraform", Some(1), || async {
123 let dothydro_folder = std::env::current_dir().unwrap().join(".hydro");
124 std::fs::create_dir_all(&dothydro_folder).unwrap();
125 let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();
126
127 std::fs::write(
128 deployment_folder.path().join("main.tf.json"),
129 serde_json::to_string(&self).unwrap(),
130 )
131 .unwrap();
132
133 if !Command::new("terraform")
134 .current_dir(deployment_folder.path())
135 .arg("init")
136 .stdout(Stdio::null())
137 .spawn()
138 .context("Failed to spawn `terraform`. Is it installed?")?
139 .wait()
140 .context("Failed to launch terraform init command")?
141 .success()
142 {
143 bail!("Failed to initialize terraform");
144 }
145
146 let (apply_id, apply) = pool.create_apply(deployment_folder)?;
147
148 let output = ProgressTracker::with_group(
149 "apply",
150 Some(self.resource.values().map(|r| r.len()).sum()),
151 || async { apply.write().await.output().await },
152 )
153 .await;
154 pool.drop_apply(apply_id);
155 output
156 })
157 .await
158 }
159}
160
161struct TerraformApply {
162 child: Option<(u32, Arc<RwLock<Child>>)>,
163 deployment_folder: Option<TempDir>,
164}
165
166async fn display_apply_outputs(stdout: &mut ChildStdout) {
167 let lines = BufReader::new(stdout).lines();
168 let mut waiting_for_result = HashMap::new();
169
170 for line in lines {
171 if let Ok(line) = line {
172 let mut split = line.split(':');
173 if let Some(first) = split.next() {
174 if first.chars().all(|c| c != ' ')
175 && split.next().is_some()
176 && split.next().is_none()
177 {
178 if line.starts_with("Plan:")
179 || line.starts_with("Outputs:")
180 || line.contains(": Still creating...")
181 || line.contains(": Reading...")
182 || line.contains(": Still reading...")
183 || line.contains(": Read complete after")
184 {
185 } else if line.ends_with(": Creating...") {
186 let id = line.split(':').next().unwrap().trim().to_string();
187 let (channel_send, channel_recv) = tokio::sync::oneshot::channel();
188 waiting_for_result.insert(
189 id.to_string(),
190 (
191 channel_send,
192 tokio::task::spawn(ProgressTracker::leaf(id, async move {
193 let _result = channel_recv.await;
196 })),
197 ),
198 );
199 } else if line.contains(": Creation complete after") {
200 let id = line.split(':').next().unwrap().trim();
201 let (sender, to_await) = waiting_for_result.remove(id).unwrap();
202 let _ = sender.send(());
203 to_await.await.unwrap();
204 } else {
205 panic!("Unexpected from Terraform: {}", line);
206 }
207 }
208 }
209 } else {
210 break;
211 }
212 }
213}
214
215fn filter_terraform_logs(child: &mut Child) {
216 let lines = BufReader::new(child.stdout.take().unwrap()).lines();
217 for line in lines {
218 if let Ok(line) = line {
219 let mut split = line.split(':');
220 if let Some(first) = split.next() {
221 if first.chars().all(|c| c != ' ')
222 && split.next().is_some()
223 && split.next().is_none()
224 {
225 eprintln!("[terraform] {}", line);
226 }
227 }
228 } else {
229 break;
230 }
231 }
232}
233
234impl TerraformApply {
235 async fn output(&mut self) -> Result<TerraformResult> {
236 let (_, child) = self.child.as_ref().unwrap().clone();
237 let mut stdout = child.write().unwrap().stdout.take().unwrap();
238 let stderr = child.write().unwrap().stderr.take().unwrap();
239
240 let status = tokio::task::spawn_blocking(move || {
241 child.write().unwrap().wait().unwrap()
243 });
244
245 let display_apply = display_apply_outputs(&mut stdout);
246 let stderr_loop = tokio::task::spawn_blocking(move || {
247 let mut lines = BufReader::new(stderr).lines();
248 while let Some(Ok(line)) = lines.next() {
249 ProgressTracker::println(format!("[terraform] {}", line));
250 }
251 });
252
253 let _ = futures::join!(display_apply, stderr_loop);
254
255 let status = status.await;
256
257 self.child = None;
258
259 if !status.unwrap().success() {
260 bail!("Terraform deployment failed, see `[terraform]` logs above.");
261 }
262
263 let mut output_command = Command::new("terraform");
264 output_command
265 .current_dir(self.deployment_folder.as_ref().unwrap().path())
266 .arg("output")
267 .arg("-json");
268
269 #[cfg(unix)]
270 {
271 output_command.process_group(0);
272 }
273
274 let output = output_command
275 .output()
276 .context("Failed to read Terraform outputs")?;
277
278 Ok(TerraformResult {
279 outputs: serde_json::from_slice(&output.stdout).unwrap(),
280 deployment_folder: self.deployment_folder.take(),
281 })
282 }
283}
284
285fn destroy_deployment(deployment_folder: TempDir) {
286 println!(
287 "Destroying terraform deployment at {}",
288 deployment_folder.path().display()
289 );
290
291 let mut destroy_command = Command::new("terraform");
292 destroy_command
293 .current_dir(deployment_folder.path())
294 .arg("destroy")
295 .arg("-auto-approve")
296 .arg("-no-color")
297 .arg("-parallelism=128")
298 .stdout(Stdio::piped());
299
300 #[cfg(unix)]
301 {
302 destroy_command.process_group(0);
303 }
304
305 let mut destroy_child = destroy_command
306 .spawn()
307 .expect("Failed to spawn terraform destroy command");
308
309 filter_terraform_logs(&mut destroy_child);
310
311 if !destroy_child
312 .wait()
313 .expect("Failed to destroy terraform deployment")
314 .success()
315 {
316 let _ = deployment_folder.into_path();
318 eprintln!("WARNING: failed to destroy terraform deployment");
319 }
320}
321
322impl Drop for TerraformApply {
323 fn drop(&mut self) {
324 if let Some((pid, child)) = self.child.take() {
325 #[cfg(unix)]
326 nix::sys::signal::kill(
327 nix::unistd::Pid::from_raw(pid as i32),
328 nix::sys::signal::Signal::SIGINT,
329 )
330 .unwrap();
331 #[cfg(not(unix))]
332 let _ = pid;
333
334 let mut child_write = child.write().unwrap();
335 if child_write.try_wait().unwrap().is_none() {
336 println!("Waiting for Terraform apply to finish...");
337 child_write.wait().unwrap();
338 }
339 }
340
341 if let Some(deployment_folder) = self.deployment_folder.take() {
342 destroy_deployment(deployment_folder);
343 }
344 }
345}
346
347#[derive(Serialize, Deserialize)]
348pub struct TerraformConfig {
349 pub required_providers: HashMap<String, TerraformProvider>,
350}
351
352#[derive(Serialize, Deserialize)]
353pub struct TerraformProvider {
354 pub source: String,
355 pub version: String,
356}
357
358#[derive(Serialize, Deserialize, Debug)]
359pub struct TerraformOutput {
360 pub value: String,
361}
362
363#[derive(Debug)]
364pub struct TerraformResult {
365 pub outputs: HashMap<String, TerraformOutput>,
366 pub deployment_folder: Option<TempDir>,
368}
369
370impl Drop for TerraformResult {
371 fn drop(&mut self) {
372 if let Some(deployment_folder) = self.deployment_folder.take() {
373 destroy_deployment(deployment_folder);
374 }
375 }
376}
377
378#[derive(Serialize, Deserialize)]
379pub struct TerraformResultOutput {
380 value: String,
381}