import argparse import json import os from collections import defaultdict from typing import Dict, Any, List, Set def find_json_files(path: str) -> List[str]: """Find all JSON files in the given path.""" json_files = [] if os.path.isfile(path): if path.lower().endswith('.json'): json_files.append(path) else: for root, _, files in os.walk(path): for file in files: if file.lower().endswith('.json'): json_files.append(os.path.join(root, file)) return json_files def read_json_file(file_path: str) -> List[Any]: """Read and parse a JSON file, returning a list of objects.""" try: with open(file_path, 'r') as f: content = json.load(f) # If root is an array, return its elements; otherwise, return content as a single-element list return content if isinstance(content, list) else [content] except json.JSONDecodeError as e: print(f"Error parsing {file_path}: {e}") return [] except Exception as e: print(f"Error reading {file_path}: {e}") return [] def init_stats_dict() -> Dict: """Initialize a statistics dictionary with default values.""" return { 'types': set(), 'fields': defaultdict(lambda: { 'count': 0, 'types': set(), 'nested_fields': defaultdict(init_stats_dict), 'examples': set() }) } def analyze_value(value: Any, stats: Dict, depth: int = 0, max_depth: int = 5) -> None: """Analyze a value and update statistics.""" if depth >= max_depth: return # Ensure stats has the basic structure if 'types' not in stats: stats.update(init_stats_dict()) value_type = type(value).__name__ stats['types'].add(value_type) if isinstance(value, (int, float)): if 'min_value' not in stats: stats['min_value'] = value stats['max_value'] = value else: stats['min_value'] = min(stats['min_value'], value) stats['max_value'] = max(stats['max_value'], value) if isinstance(value, dict): for k, v in value.items(): if k not in stats['fields']: stats['fields'][k] = init_stats_dict() stats['fields'][k]['count'] = 0 stats['fields'][k]['examples'] = set() field_stats = stats['fields'][k] field_stats['count'] += 1 field_stats['types'].add(type(v).__name__) # Store example values (limit to 3) if not isinstance(v, (dict, list)) and len(field_stats['examples']) < 3: field_stats['examples'].add(str(v)) if isinstance(v, dict): analyze_value(v, field_stats, depth + 1, max_depth) elif isinstance(v, list): analyze_value(v, field_stats, depth + 1, max_depth) elif isinstance(value, list): if 'list_item_types' not in stats: stats['list_item_types'] = set() stats['list_item_stats'] = init_stats_dict() stats['min_length'] = len(value) stats['max_length'] = len(value) else: stats['min_length'] = min(stats['min_length'], len(value)) stats['max_length'] = max(stats['max_length'], len(value)) for item in value: item_type = type(item).__name__ stats['list_item_types'].add(item_type) # Analyze list items in detail if isinstance(item, (dict, list)): analyze_value(item, stats['list_item_stats'], depth + 1, max_depth) elif isinstance(item, (int, float)): if 'min_value' not in stats['list_item_stats']: stats['list_item_stats']['min_value'] = item stats['list_item_stats']['max_value'] = item else: stats['list_item_stats']['min_value'] = min(stats['list_item_stats']['min_value'], item) stats['list_item_stats']['max_value'] = max(stats['list_item_stats']['max_value'], item) elif isinstance(item, str) and len(stats.get('examples', set())) < 3: if 'examples' not in stats: stats['examples'] = set() stats['examples'].add(str(item)) def merge_objects(objects: List[Any], files) -> Dict: """Merge multiple JSON objects and analyze their structure.""" stats = init_stats_dict() stats['total_objects'] = len(objects) stats['total_files'] = len(files) for obj in objects: if obj is not None: analyze_value(obj, stats) return stats def format_value(value: Any) -> str: """Format a value for display.""" if isinstance(value, float): return f"{value:.2f}" return str(value) def print_field_stats(stats: Dict, prefix: str = "") -> None: """Helper function to print field statistics recursively.""" # Print examples for non-container types if 'examples' in stats and stats['examples']: print(f"{prefix}Examples: {', '.join(sorted(stats['examples']))}") # Print numeric value ranges if 'min_value' in stats: min_val = format_value(stats['min_value']) max_val = format_value(stats['max_value']) if min_val != max_val: print(f"{prefix}Value range: {min_val} to {max_val}") else: print(f"{prefix}Value: {min_val}") # Print list properties if 'list_item_types' in stats: print(f"{prefix}List properties:") print(f"{prefix} Length range: {stats['min_length']} to {stats['max_length']}") item_types = sorted(stats['list_item_types']) print(f"{prefix} Item types: {', '.join(item_types)}") # Print list item statistics if 'list_item_stats' in stats: item_stats = stats['list_item_stats'] if 'min_value' in item_stats: min_val = format_value(item_stats['min_value']) max_val = format_value(item_stats['max_value']) if min_val != max_val: print(f"{prefix} Item value range: {min_val} to {max_val}") else: print(f"{prefix} Item value: {min_val}") if 'examples' in item_stats and item_stats['examples']: print(f"{prefix} Item examples: {', '.join(sorted(item_stats['examples']))}") if 'fields' in item_stats and item_stats['fields']: print(f"{prefix} Item structure:") for field_name, field_stats in sorted(item_stats['fields'].items()): print(f"{prefix} {field_name}:") print(f"{prefix} Occurrences: {field_stats['count']}") print(f"{prefix} Types: {', '.join(sorted(field_stats['types']))}") print_field_stats(field_stats, prefix + " ") # Print nested fields from the fields dictionary if 'fields' in stats and stats['fields']: print(f"{prefix}Nested structure:") for field_name, field_stats in sorted(stats['fields'].items()): print(f"{prefix} {field_name}:") print(f"{prefix} Occurrences: {field_stats['count']}") print(f"{prefix} Types: {', '.join(sorted(field_stats['types']))}") print_field_stats(field_stats, prefix + " ") def print_summary(stats: Dict) -> None: """Print a formatted summary of the JSON structure.""" print("\n=== JSON Structure Summary ===") print(f"\nTotal objects processed: {stats['total_objects']} (in {stats['total_files']} files)") print(f"Root level types found: {', '.join(stats['types'])}") print("\nField Analysis:") for field, field_stats in sorted(stats['fields'].items()): print(f"\n{field}:") print(f" Occurrences: {field_stats['count']}") print(f" Types: {', '.join(field_stats['types'])}") print_field_stats(field_stats, " ") def main(): parser = argparse.ArgumentParser(description='Analyze and merge JSON files') parser.add_argument('paths', nargs='+', help='Paths to JSON files or directories') args = parser.parse_args() # Find all JSON files json_files = [] for path in args.paths: json_files.extend(find_json_files(path)) if not json_files: print("No JSON files found in the specified paths.") return # Read and process all JSON files objects = [] for file_path in json_files: file_objects = read_json_file(file_path) objects.extend(file_objects) if not objects: print("No valid JSON objects found in the specified files.") return # Analyze and print summary stats = merge_objects(objects, json_files) print_summary(stats) if __name__ == '__main__': main()