Source code for validate_actions.pipeline_stages.parser

"""Parser for YAML files, from input file to Python data structure representation."""
import copy
import re
import sys
from abc import abstractmethod
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple

import yaml

from validate_actions.domain_model.primitives import Expression, Pos, String
from validate_actions.globals.problems import Problem, ProblemLevel, Problems
from validate_actions.globals.process_stage import ProcessStage


[docs] class YAMLParser(ProcessStage[Path, Dict[String, Any]]): """Abstract base class for parsing GitHub Actions workflow YAML files. This parser performs token-level parsing to enable precise position tracking and auto-fixing of workflow files. The parser maintains exact line, column, and character positions for all parsed elements to support: - Validation rule error reporting with precise locations - Auto-fixing of problems using character-level edits - Expression parsing within ${{ }} syntax - Structured AST construction for downstream pipeline stages """
[docs] @abstractmethod def process(self, file: Path) -> Dict[String, Any]: """Parse a GitHub Actions workflow YAML file into a structured representation. Converts YAML content into a dictionary with String keys that preserve position information for validation and auto-fixing. Handles GitHub Actions-specific constructs including expressions and complex nested structures. Args: file (Path): Path to the GitHub Actions workflow YAML file to parse. Returns: Dict[String, Any]: Parsed YAML as dictionary with position-aware String keys and values. Returns empty dict if parsing fails or file is invalid. """ pass
[docs] class PyYAMLParser(YAMLParser): """YAML parser implementation using PyYAML.""" def __init__(self, problems: Problems) -> None: """Initialize the PyYAMLParser.""" super().__init__(problems) self.RULE = "yaml-syntax"
[docs] def process(self, file: Path) -> Dict[String, Any]: """Parse a YAML file into a structured representation using PyYAML. Args: file (Path): Path to the YAML file to parse. Returns: Dict[String, Any]: The parsed YAML content as a dictionary. """ # Read file from I/O try: with open(file, "r") as f: buffer = f.read() except OSError as e: print(e, file=sys.stderr) self.problems.append( Problem( pos=Pos(0, 0), desc=f"Error reading from file system for {file}", level=ProblemLevel.ERR, rule=self.RULE, ) ) return {} # Use PyYAML to parse the file as a flat list of tokens try: tokens = list(yaml.scan(buffer, Loader=yaml.SafeLoader)) except yaml.error.MarkedYAMLError as e: self.problems.append( Problem( pos=Pos(0, 0), desc=f"Error parsing YAML file: {e}", level=ProblemLevel.ERR, rule=self.RULE, ) ) return {} # Basic structure validation if not self._validate_basic_yaml_structure(tokens): self.problems.append( Problem( pos=Pos(0, 0), desc="File does not appear to be a valid GitHub Actions workflow YAML", level=ProblemLevel.ERR, rule=self.RULE, ) ) return {} # Process the tokens to build a structured representation content: Dict[String, Any] = {} error_desc = "Error parsing top-level workflow structure" i = 0 try: while i < len(tokens): token = tokens[i] if isinstance(token, yaml.StreamStartToken): pass elif isinstance(token, yaml.StreamEndToken): return content elif isinstance(token, yaml.BlockMappingStartToken): content, i = self.__parse_block_mapping(tokens, i) elif isinstance(token, yaml.BlockEntryToken): pass else: self.problems.append( Problem( pos=Pos(0, 0), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE) ) i += 1 except (Exception) as e: error_desc = f"Error parsing workflow structure: {str(e)}" # If we reach here, it means there's an unexpected error in the # workflow structure self.problems.append( Problem(pos=Pos(0, 0), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE) ) return {}
def __parse_block_mapping( self, tokens: List[yaml.Token], index: int = 0 ) -> Tuple[Dict[String, Any], int]: """Parse a YAML block mapping into a dictionary. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[Dict[String, Any], int]: The parsed dictionary and the new index position. """ mapping: Dict[String, Any] = {} error_desc = "Error parsing block mapping" while index < len(tokens): token = tokens[index] # Start of the block mapping if isinstance(token, yaml.BlockMappingStartToken): pass # When we hit the end of a block, return mapping and next index elif isinstance(token, yaml.BlockEndToken): return mapping, index # Process a key. elif isinstance(token, yaml.KeyToken): # The token after KeyToken is the actual key index += 1 next_token = self.__safe_token_access(tokens, index) if next_token is None: self.problems.append( Problem( pos=self.__parse_pos(token), desc="Unexpected end of tokens while parsing key", level=ProblemLevel.ERR, rule=self.RULE, ) ) return {}, index if isinstance(next_token, yaml.ScalarToken): key = self.__parse_str(next_token) else: self.problems.append( Problem( pos=self.__parse_pos(next_token), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) # Process a value. elif isinstance(token, yaml.ValueToken): # The token after ValueToken is the actual value index += 1 if index >= len(tokens): self.problems.append( Problem( pos=self.__parse_pos(token), desc="Unexpected end of tokens while parsing value", level=ProblemLevel.ERR, rule=self.RULE, ) ) return {}, index value, index = self.__parse_block_value(tokens, index) mapping[key] = value else: self.problems.append( Problem( pos=self.__parse_pos(token), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) index += 1 # If we reach here, it means there's an unexpected error in the # block mapping error_token = self.__safe_token_access(tokens, index) error_pos = self.__parse_pos(error_token) if error_token else Pos(0, 0, 0) self.problems.append( Problem( pos=error_pos, desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) return {}, index def __parse_block_value(self, tokens: List[yaml.Token], index: int = 0) -> Tuple[Any, int]: """Parse a YAML block value into the appropriate Python type. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[Any, int]: The parsed value and the new index position. """ token = tokens[index] value: Any # value is a scalar if isinstance(token, yaml.ScalarToken): value = self.__parse_scalar_value(token) # value is a nested block mapping elif isinstance(token, yaml.BlockMappingStartToken): value, index = self.__parse_block_mapping(tokens, index) # value is a block sequence # - x # - y elif isinstance(token, yaml.BlockSequenceStartToken): value, index = self.__parse_block_sequence(tokens, index) # also block sequence but with a non-critical missing indent before the # - elif isinstance(token, yaml.BlockEntryToken): value, index = self.__parse_block_sequence_unindented(tokens, index) # value is a inline flow sequence [ x, y, z ] elif isinstance(token, yaml.FlowSequenceStartToken): value, index = self.__parse_flow_sequence(tokens, index) # value is a inline flow mapping { x: y, z: w } elif isinstance(token, yaml.FlowMappingStartToken): value, index = self.__parse_flow_mapping(tokens, index) # else assume empty block mapping else: value = {} index -= 1 # Decrement index to reprocess current token return value, index def __parse_block_sequence( self, tokens: List[yaml.Token], index: int = 0 ) -> Tuple[List[Any], int]: """Parse a YAML block sequence into a list. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[List[Any], int]: The parsed list and the new index position. """ lst: Any = [] while index < len(tokens): token = tokens[index] if isinstance(token, yaml.BlockSequenceStartToken): pass elif isinstance(token, yaml.BlockEntryToken): pass elif isinstance(token, yaml.BlockEndToken): return lst, index else: # Process a value. value, index = self.__parse_block_value(tokens, index) lst.append(value) index += 1 # If we reach here, it means there's an unexpected error in the # block sequence error_token = self.__safe_token_access(tokens, index) error_pos = self.__parse_pos(error_token) if error_token else Pos(0, 0, 0) self.problems.append( Problem( pos=error_pos, desc="Error parsing block sequence", level=ProblemLevel.ERR, rule=self.RULE, ) ) return [], index def __parse_block_sequence_unindented( self, tokens: List[yaml.Token], index: int = 0 ) -> Tuple[List[Any], int]: """Parse an unindented YAML block sequence into a list. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[List[Any], int]: The parsed list and the new index position. """ lst = [] while index < len(tokens): token = tokens[index] if isinstance(token, yaml.BlockEntryToken): pass else: # Process a value. value, index = self.__parse_block_value(tokens, index) lst.append(value) next = tokens[index + 1] if not isinstance(next, yaml.BlockEntryToken): return lst, index index += 1 # If we reach here, it means there's an unexpected error in the # block sequence self.problems.append( Problem( pos=self.__parse_pos(tokens[index]), desc="Error parsing block sequence", level=ProblemLevel.ERR, rule=self.RULE, ) ) return [], index def __parse_flow_mapping( self, tokens: List[yaml.Token], index: int = 0 ) -> Tuple[Dict[String, Any], int]: """Parse a YAML flow mapping into a dictionary. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[Dict[String, Any], int]: The parsed dictionary and the new index position. """ mapping: Dict[String, Any] = {} error_desc = "Error parsing flow mapping" while index < len(tokens): token = tokens[index] if isinstance(token, yaml.FlowMappingStartToken): pass elif isinstance(token, yaml.FlowMappingEndToken): return mapping, index elif isinstance(token, yaml.KeyToken): index += 1 next_token = tokens[index] if isinstance(next_token, yaml.ScalarToken): key = self.__parse_str(next_token) else: self.problems.append( Problem( pos=self.__parse_pos(next_token), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) elif isinstance(token, yaml.ValueToken): index += 1 next_token = tokens[index] if isinstance(next_token, yaml.ScalarToken): value = self.__parse_scalar_value(next_token) mapping[key] = value elif isinstance(next_token, yaml.FlowMappingStartToken): mapping[key], index = self.__parse_flow_mapping(tokens, index) elif isinstance(next_token, yaml.FlowSequenceStartToken): mapping[key], index = self.__parse_flow_sequence(tokens, index) else: self.problems.append( Problem( pos=self.__parse_pos(next_token), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) else: self.problems.append( Problem( pos=self.__parse_pos(token), desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) index += 1 # If we reach here, it means there's an unexpected error in the # flow mapping error_token = self.__safe_token_access(tokens, index) error_pos = self.__parse_pos(error_token) if error_token else Pos(0, 0, 0) self.problems.append( Problem( pos=error_pos, desc=error_desc, level=ProblemLevel.ERR, rule=self.RULE, ) ) return {}, index def __parse_flow_sequence( self, tokens: List[yaml.Token], index: int = 0 ) -> Tuple[List[Any], int]: """Parse a YAML flow sequence into a list. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[List[Any], int]: The parsed list and the new index position. """ lst: List[Any] = [] while index < len(tokens): token = tokens[index] if isinstance(token, yaml.FlowSequenceStartToken): pass elif isinstance(token, yaml.FlowEntryToken): pass elif isinstance(token, yaml.FlowSequenceEndToken): return lst, index else: # Process a value. value, index = self.__parse_flow_value(tokens, index) lst.append(value) index += 1 error_token = self.__safe_token_access(tokens, index) error_pos = self.__parse_pos(error_token) if error_token else Pos(0, 0, 0) self.problems.append( Problem( pos=error_pos, desc="Error parsing flow sequence", level=ProblemLevel.ERR, rule=self.RULE, ) ) return [], index def __parse_flow_value(self, tokens: List[yaml.Token], index: int = 0) -> Tuple[Any, int]: """Parse a YAML flow value into the appropriate Python type. Args: tokens (List[yaml.Token]): The list of YAML tokens. index (int, optional): The current index in the token list. Defaults to 0. Returns: Tuple[Any, int]: The parsed value and the new index position. """ token = tokens[index] value: Any if isinstance(token, yaml.ScalarToken): value = self.__parse_scalar_value(token) elif isinstance(token, yaml.FlowMappingStartToken): value, index = self.__parse_flow_mapping(tokens, index) elif isinstance(token, yaml.FlowSequenceStartToken): value, index = self.__parse_flow_sequence(tokens, index) else: self.problems.append( Problem( pos=self.__parse_pos(token), desc="Error parsing flow value", level=ProblemLevel.ERR, rule=self.RULE, ) ) return value, index def __parse_scalar_value(self, token: yaml.ScalarToken): """Parse a scalar token into the appropriate Python type (bool, int, float, or String). Args: token (yaml.ScalarToken): The scalar token to parse. Returns: Any: The parsed value as the appropriate Python type. """ val = token.value # Boolean handling if isinstance(val, bool): return val elif val == "true": return True elif val == "false": return False # Number handling try: # First try to parse as int if possible if str(int(float(val))) == val: return int(val) # Otherwise parse as float return float(val) except ValueError: # If not a boolean or number, return as String return self.__parse_str(token) def __parse_str(self, token: yaml.ScalarToken) -> String: """ Reads a string and returns a String object. """ token_string: str = token.value token_pos = self.__parse_pos(token) # parse expressions in the form of ${{ ... }} # we need the full string to calc indices for expression fixing pattern = r"\${{\s*(.*?)\s*}}" full_str: str = token.start_mark.buffer token_full_str = full_str[token.start_mark.index : token.end_mark.index] matches = re.finditer(pattern, token_full_str) # finds expressions in token string expressions = self._parse_expressions(matches, token_pos, token) return String(token_string, token_pos, expressions) def __parse_pos(self, token: yaml.Token) -> Pos: """ Reads a token and returns a Pos object. """ return Pos(token.start_mark.line, token.start_mark.column, token.start_mark.index) def __safe_token_access(self, tokens: List[yaml.Token], index: int) -> Optional[yaml.Token]: """ Safely access a token at the given index, returning None if out of bounds. """ if 0 <= index < len(tokens): return tokens[index] return None def _parse_expressions( self, matches: Iterator[re.Match[str]], token_pos: Pos, token: yaml.ScalarToken ) -> List[Expression]: """ Parses expressions from the matches and builds an expression list. """ expressions: List[Expression] = [] # for each expression in the list of matches (expressions) for match_obj in matches: # extract the expression string expr_str = match_obj.group(1) # Split expression into parts on dots raw_parts_list = expr_str.split(".") parts_ast_nodes = [] # determine the character index of the part # first part begins at the start of the expression part_pos = copy.copy(token_pos) part_start_char_idx = match_obj.start(1) part_pos.idx = token.start_mark.index + part_start_char_idx # for each part in the expression for i, part_segment_str in enumerate(raw_parts_list): # check for bracket access like object['property'] in the part bracket_match_obj = re.match(r"(\w+)\[['\"](.+)['\"]\]", part_segment_str) if bracket_match_obj: main_name_str = bracket_match_obj.group(1) # first part e.g., 'ports' parts_ast_nodes.append(String(main_name_str, part_pos)) content_in_brackets_str = bracket_match_obj.group(2) # second part e.g. '6379' # calculate offset of second part within part_segment_str # the start of group(2) is relative to the start of part_segment_str part_pos.idx += bracket_match_obj.start(2) parts_ast_nodes.append(String(content_in_brackets_str, part_pos)) else: # Simple part (no brackets) parts_ast_nodes.append(String(part_segment_str, part_pos)) part_pos = copy.copy(part_pos) # Advance the offset within expr_str for the next part part_pos.idx += len(part_segment_str) if i < len(raw_parts_list) - 1: # If not the last part, account for the dot part_pos.idx += 1 expressions.append( Expression( pos=token_pos, # Pos of the start of the part string=expr_str, # The full expression string parts=parts_ast_nodes, # List of String objects for each part ) ) return expressions def _validate_basic_yaml_structure(self, tokens: List[yaml.Token]) -> bool: """Basic validation that this looks like a GitHub Actions workflow. Checks for minimal expected structure: - Contains at least a mapping structure - Not just empty or whitespace - Has reasonable token count """ if not tokens: return False # Must have at least stream start/end and some content if len(tokens) < 3: return False # Should start with StreamStart and contain at least one mapping has_stream_start = any(isinstance(token, yaml.StreamStartToken) for token in tokens) has_mapping = any(isinstance(token, yaml.BlockMappingStartToken) for token in tokens) has_stream_end = any(isinstance(token, yaml.StreamEndToken) for token in tokens) return has_stream_start and has_mapping and has_stream_end