Files
SnarfCode/src/iac_reverse/validator/validator.py
2026-05-22 00:19:30 -04:00

654 lines
21 KiB
Python

"""Terraform validation runner.
Runs terraform init, validate, and plan against generated output
to verify syntactic correctness and detect infrastructure drift.
Includes auto-correction logic that attempts to fix common validation
errors heuristically.
"""
import json
import re
import shutil
import subprocess
from pathlib import Path
from iac_reverse.models import PlannedChange, ValidationError, ValidationResult
class Validator:
"""Runs Terraform commands to validate generated IaC output.
Validates generated .tf and .tfstate files by running terraform init,
terraform validate, and terraform plan. Reports validation errors and
planned changes (drift) back to the caller.
When validation fails, attempts heuristic-based auto-corrections up to
max_correction_attempts times before reporting failure.
"""
def validate(
self, output_dir: str, max_correction_attempts: int = 3
) -> ValidationResult:
"""Run terraform init, validate, and plan against the output directory.
After terraform validate fails, attempts auto-correction of common
errors (unknown attributes, missing required blocks, syntax issues)
up to max_correction_attempts times. Re-validates after each correction.
Args:
output_dir: Path to directory containing generated .tf and .tfstate files.
max_correction_attempts: Maximum number of auto-correction attempts
before reporting failure. Defaults to 3.
Returns:
ValidationResult with init/validate/plan success flags,
any planned changes (drift), validation errors, and the number
of correction attempts made.
"""
# Check terraform binary availability
terraform_bin = shutil.which("terraform")
if terraform_bin is None:
return ValidationResult(
init_success=False,
validate_success=False,
plan_success=False,
errors=[
ValidationError(
file="",
message=(
"Terraform binary not found. "
"Terraform is required for validation. "
"Please install Terraform and ensure it is on your PATH."
),
)
],
correction_attempts=0,
)
output_path = Path(output_dir)
errors: list[ValidationError] = []
planned_changes: list[PlannedChange] = []
# Run terraform init
init_success = self._run_init(output_path, errors)
if not init_success:
return ValidationResult(
init_success=False,
validate_success=False,
plan_success=False,
errors=errors,
correction_attempts=0,
)
# Run terraform validate with auto-correction loop
correction_attempts = 0
validate_success = self._run_validate(output_path, errors)
while not validate_success and correction_attempts < max_correction_attempts:
# Attempt to correct the errors
corrected = self._attempt_correction(output_path, errors)
if not corrected:
# No corrections could be applied, stop trying
break
correction_attempts += 1
# Re-validate after correction
errors = []
validate_success = self._run_validate(output_path, errors)
if not validate_success:
return ValidationResult(
init_success=True,
validate_success=False,
plan_success=False,
errors=errors,
correction_attempts=correction_attempts,
)
# Run terraform plan
plan_success = self._run_plan(output_path, errors, planned_changes)
return ValidationResult(
init_success=True,
validate_success=True,
plan_success=plan_success,
planned_changes=planned_changes,
errors=errors,
correction_attempts=correction_attempts,
)
def _run_init(
self, output_path: Path, errors: list[ValidationError]
) -> bool:
"""Run terraform init in the output directory.
Returns True if init succeeds, False otherwise.
"""
try:
result = subprocess.run(
["terraform", "init", "-no-color"],
cwd=str(output_path),
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
errors.append(
ValidationError(
file="",
message=f"terraform init failed: {result.stderr.strip()}",
)
)
return False
return True
except subprocess.TimeoutExpired:
errors.append(
ValidationError(
file="",
message="terraform init timed out after 120 seconds",
)
)
return False
except OSError as e:
errors.append(
ValidationError(
file="",
message=f"Failed to execute terraform init: {e}",
)
)
return False
def _run_validate(
self, output_path: Path, errors: list[ValidationError]
) -> bool:
"""Run terraform validate with JSON output and parse errors.
Returns True if validation passes, False otherwise.
"""
try:
result = subprocess.run(
["terraform", "validate", "-json"],
cwd=str(output_path),
capture_output=True,
text=True,
timeout=60,
)
return self._parse_validate_output(result.stdout, errors)
except subprocess.TimeoutExpired:
errors.append(
ValidationError(
file="",
message="terraform validate timed out after 60 seconds",
)
)
return False
except OSError as e:
errors.append(
ValidationError(
file="",
message=f"Failed to execute terraform validate: {e}",
)
)
return False
def _parse_validate_output(
self, stdout: str, errors: list[ValidationError]
) -> bool:
"""Parse terraform validate JSON output.
Expected format:
{
"valid": true/false,
"error_count": N,
"diagnostics": [
{
"severity": "error",
"summary": "...",
"detail": "...",
"range": {
"filename": "main.tf",
"start": {"line": 1, "column": 1},
...
}
}
]
}
"""
try:
data = json.loads(stdout)
except (json.JSONDecodeError, TypeError):
errors.append(
ValidationError(
file="",
message="Failed to parse terraform validate output as JSON",
)
)
return False
if data.get("valid", False):
return True
diagnostics = data.get("diagnostics", [])
for diag in diagnostics:
if diag.get("severity") != "error":
continue
filename = ""
line = None
range_info = diag.get("range")
if range_info:
filename = range_info.get("filename", "")
start = range_info.get("start")
if start:
line = start.get("line")
summary = diag.get("summary", "")
detail = diag.get("detail", "")
message = summary
if detail:
message = f"{summary}: {detail}"
errors.append(
ValidationError(file=filename, message=message, line=line)
)
return False
def _run_plan(
self,
output_path: Path,
errors: list[ValidationError],
planned_changes: list[PlannedChange],
) -> bool:
"""Run terraform plan with JSON output and parse planned changes.
Returns True if zero changes are planned, False otherwise.
"""
try:
result = subprocess.run(
["terraform", "plan", "-json", "-no-color"],
cwd=str(output_path),
capture_output=True,
text=True,
timeout=300,
)
if result.returncode not in (0, 2):
# returncode 2 means changes are planned, which is valid output
errors.append(
ValidationError(
file="",
message=f"terraform plan failed: {result.stderr.strip()}",
)
)
return False
return self._parse_plan_output(
result.stdout, errors, planned_changes
)
except subprocess.TimeoutExpired:
errors.append(
ValidationError(
file="",
message="terraform plan timed out after 300 seconds",
)
)
return False
except OSError as e:
errors.append(
ValidationError(
file="",
message=f"Failed to execute terraform plan: {e}",
)
)
return False
def _parse_plan_output(
self,
stdout: str,
errors: list[ValidationError],
planned_changes: list[PlannedChange],
) -> bool:
"""Parse terraform plan JSON output (streaming JSON lines format).
Terraform plan -json outputs one JSON object per line. We look for
lines with type "resource_drift" or "planned_change" to identify
changes, and "change_summary" for the overall result.
Each resource change line looks like:
{
"type": "planned_change",
"change": {
"resource": {
"addr": "aws_instance.example"
},
"action": "create" | "update" | "delete"
}
}
"""
has_changes = False
for line in stdout.strip().splitlines():
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
entry_type = entry.get("type", "")
if entry_type in ("planned_change", "resource_drift"):
change = entry.get("change", {})
resource = change.get("resource", {})
resource_addr = resource.get("addr", "unknown")
action = change.get("action", "unknown")
# Map terraform action names to our change types
change_type = self._map_action_to_change_type(action)
# Build details from before/after if available
details = f"Action: {action}"
planned_changes.append(
PlannedChange(
resource_address=resource_addr,
change_type=change_type,
details=details,
)
)
has_changes = True
elif entry_type == "change_summary":
changes_info = entry.get("changes", {})
add = changes_info.get("add", 0)
change = changes_info.get("change", 0)
remove = changes_info.get("remove", 0)
if add + change + remove > 0:
has_changes = True
# plan_success is True only when there are zero planned changes
return not has_changes
@staticmethod
def _map_action_to_change_type(action: str) -> str:
"""Map terraform plan action to our change type vocabulary."""
action_map = {
"create": "add",
"update": "modify",
"delete": "destroy",
"replace": "modify",
"read": "add",
}
return action_map.get(action, action)
# ------------------------------------------------------------------
# Auto-correction logic
# ------------------------------------------------------------------
def _attempt_correction(
self, output_path: Path, errors: list[ValidationError]
) -> bool:
"""Attempt to auto-correct validation errors using heuristics.
Applies corrections for:
- Unknown/unsupported attributes (removes the offending line)
- Missing required provider blocks (adds empty provider block)
- Common syntax issues (unclosed braces, trailing commas)
Args:
output_path: Path to the directory containing .tf files.
errors: List of validation errors to attempt to correct.
Returns:
True if at least one correction was applied, False otherwise.
"""
any_corrected = False
for error in errors:
corrected = self._correct_single_error(output_path, error)
if corrected:
any_corrected = True
return any_corrected
def _correct_single_error(
self, output_path: Path, error: ValidationError
) -> bool:
"""Attempt to correct a single validation error.
Returns True if a correction was applied.
"""
message = error.message.lower()
# Handle unknown/unsupported attribute errors
if self._is_unknown_attribute_error(message):
return self._remove_attribute_line(output_path, error)
# Handle missing required provider block
if self._is_missing_provider_error(message):
return self._add_missing_provider_block(output_path, error)
# Handle syntax errors (unclosed braces, trailing commas)
if self._is_syntax_error(message):
return self._fix_syntax_error(output_path, error)
return False
@staticmethod
def _is_unknown_attribute_error(message: str) -> bool:
"""Check if the error is about an unknown or unsupported attribute."""
patterns = [
"unsupported argument",
"unsupported attribute",
"unknown attribute",
"an argument named",
"is not expected here",
"no such attribute",
]
return any(p in message for p in patterns)
@staticmethod
def _is_missing_provider_error(message: str) -> bool:
"""Check if the error is about a missing required provider."""
patterns = [
"missing required provider",
"provider configuration not present",
"no provider",
"required provider",
]
return any(p in message for p in patterns)
@staticmethod
def _is_syntax_error(message: str) -> bool:
"""Check if the error is a syntax error that might be fixable."""
patterns = [
"unexpected closing brace",
"unclosed configuration block",
"expected closing brace",
"invalid character",
"trailing comma",
"argument or block definition required",
]
return any(p in message for p in patterns)
def _remove_attribute_line(
self, output_path: Path, error: ValidationError
) -> bool:
"""Remove the line containing an unknown/unsupported attribute.
If the error has file and line info, removes that specific line.
Otherwise, attempts to find and remove the attribute by name from
the error message.
"""
if not error.file:
return False
file_path = output_path / error.file
if not file_path.exists():
return False
try:
lines = file_path.read_text(encoding="utf-8").splitlines()
except OSError:
return False
if error.line is not None and 1 <= error.line <= len(lines):
# Remove the specific line
line_idx = error.line - 1
removed_line = lines[line_idx].strip()
# Only remove if it looks like an attribute assignment
if "=" in removed_line or removed_line.endswith("{"):
lines.pop(line_idx)
try:
file_path.write_text(
"\n".join(lines) + "\n", encoding="utf-8"
)
return True
except OSError:
return False
# Try to find the attribute name from the error message
attr_name = self._extract_attribute_name(error.message)
if attr_name:
return self._remove_attribute_by_name(file_path, attr_name, lines)
return False
@staticmethod
def _extract_attribute_name(message: str) -> str:
"""Extract the attribute name from an error message.
Looks for patterns like:
- "An argument named 'foo' is not expected here"
- "Unsupported argument: foo"
"""
# Pattern: quoted attribute name
match = re.search(r"['\"](\w+)['\"]", message)
if match:
return match.group(1)
# Pattern: "named X is not"
match = re.search(r"named\s+(\w+)\s+is", message)
if match:
return match.group(1)
return ""
@staticmethod
def _remove_attribute_by_name(
file_path: Path, attr_name: str, lines: list[str]
) -> bool:
"""Remove lines containing the given attribute assignment."""
pattern = re.compile(rf"^\s*{re.escape(attr_name)}\s*=")
new_lines = [line for line in lines if not pattern.match(line)]
if len(new_lines) == len(lines):
return False # Nothing was removed
try:
file_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
return True
except OSError:
return False
def _add_missing_provider_block(
self, output_path: Path, error: ValidationError
) -> bool:
"""Add a missing provider block to the configuration.
Extracts the provider name from the error message and creates
an empty provider block in a providers.tf file.
"""
provider_name = self._extract_provider_name(error.message)
if not provider_name:
return False
providers_file = output_path / "providers.tf"
provider_block = f'\nprovider "{provider_name}" {{}}\n'
try:
if providers_file.exists():
existing = providers_file.read_text(encoding="utf-8")
# Don't add if already present
if f'provider "{provider_name}"' in existing:
return False
providers_file.write_text(
existing + provider_block, encoding="utf-8"
)
else:
providers_file.write_text(provider_block, encoding="utf-8")
return True
except OSError:
return False
@staticmethod
def _extract_provider_name(message: str) -> str:
"""Extract provider name from a missing provider error message.
Looks for patterns like:
- "Missing required provider 'aws'"
- 'provider "kubernetes" configuration not present'
"""
match = re.search(r"provider\s+['\"](\w+)['\"]", message)
if match:
return match.group(1)
match = re.search(r"['\"](\w+)['\"]", message)
if match:
return match.group(1)
return ""
def _fix_syntax_error(
self, output_path: Path, error: ValidationError
) -> bool:
"""Attempt to fix common syntax errors.
Handles:
- Trailing commas before closing braces
- Missing closing braces
- Lines with 'argument or block definition required' (remove empty/bad lines)
"""
if not error.file:
return False
file_path = output_path / error.file
if not file_path.exists():
return False
try:
content = file_path.read_text(encoding="utf-8")
except OSError:
return False
original_content = content
# Fix trailing commas before closing braces/brackets
content = re.sub(r",(\s*[}\]])", r"\1", content)
# Fix 'argument or block definition required' - remove empty lines
# at the error location
if error.line is not None and "argument or block definition required" in error.message.lower():
lines = content.splitlines()
if 1 <= error.line <= len(lines):
line_idx = error.line - 1
line = lines[line_idx].strip()
# Remove the problematic line if it's empty or just whitespace/punctuation
if not line or line in (",", ";"):
lines.pop(line_idx)
content = "\n".join(lines) + "\n"
if content != original_content:
try:
file_path.write_text(content, encoding="utf-8")
return True
except OSError:
return False
return False