Files
json-summary/main.py
OMGeeky 8e9a5f4df1 Enhance JSON file reading and processing
- Update read_json_file to return a list of objects, handling both single and multiple JSON objects.
- Modify merge_objects to include total_files in the statistics.
- Adjust print_summary to display the number of files processed alongside total objects.
- Refactor JSON file reading loop to accumulate objects from multiple files.
2025-02-22 17:44:19 +01:00

237 lines
8.7 KiB
Python

import argparse
import json
import os
from collections import defaultdict
from typing import Dict, Any, List, Set
def find_json_files(path: str) -> List[str]:
"""Find all JSON files in the given path."""
json_files = []
if os.path.isfile(path):
if path.lower().endswith('.json'):
json_files.append(path)
else:
for root, _, files in os.walk(path):
for file in files:
if file.lower().endswith('.json'):
json_files.append(os.path.join(root, file))
return json_files
def read_json_file(file_path: str) -> List[Any]:
"""Read and parse a JSON file, returning a list of objects."""
try:
with open(file_path, 'r') as f:
content = json.load(f)
# If root is an array, return its elements; otherwise, return content as a single-element list
return content if isinstance(content, list) else [content]
except json.JSONDecodeError as e:
print(f"Error parsing {file_path}: {e}")
return []
except Exception as e:
print(f"Error reading {file_path}: {e}")
return []
def init_stats_dict() -> Dict:
"""Initialize a statistics dictionary with default values."""
return {
'types': set(),
'fields': defaultdict(lambda: {
'count': 0,
'types': set(),
'nested_fields': defaultdict(init_stats_dict),
'examples': set()
})
}
def analyze_value(value: Any, stats: Dict, depth: int = 0, max_depth: int = 5) -> None:
"""Analyze a value and update statistics."""
if depth >= max_depth:
return
# Ensure stats has the basic structure
if 'types' not in stats:
stats.update(init_stats_dict())
value_type = type(value).__name__
stats['types'].add(value_type)
if isinstance(value, (int, float)):
if 'min_value' not in stats:
stats['min_value'] = value
stats['max_value'] = value
else:
stats['min_value'] = min(stats['min_value'], value)
stats['max_value'] = max(stats['max_value'], value)
if isinstance(value, dict):
for k, v in value.items():
if k not in stats['fields']:
stats['fields'][k] = init_stats_dict()
stats['fields'][k]['count'] = 0
stats['fields'][k]['examples'] = set()
field_stats = stats['fields'][k]
field_stats['count'] += 1
field_stats['types'].add(type(v).__name__)
# Store example values (limit to 3)
if not isinstance(v, (dict, list)) and len(field_stats['examples']) < 3:
field_stats['examples'].add(str(v))
if isinstance(v, dict):
analyze_value(v, field_stats, depth + 1, max_depth)
elif isinstance(v, list):
analyze_value(v, field_stats, depth + 1, max_depth)
elif isinstance(value, list):
if 'list_item_types' not in stats:
stats['list_item_types'] = set()
stats['list_item_stats'] = init_stats_dict()
stats['min_length'] = len(value)
stats['max_length'] = len(value)
else:
stats['min_length'] = min(stats['min_length'], len(value))
stats['max_length'] = max(stats['max_length'], len(value))
for item in value:
item_type = type(item).__name__
stats['list_item_types'].add(item_type)
# Analyze list items in detail
if isinstance(item, (dict, list)):
analyze_value(item, stats['list_item_stats'], depth + 1, max_depth)
elif isinstance(item, (int, float)):
if 'min_value' not in stats['list_item_stats']:
stats['list_item_stats']['min_value'] = item
stats['list_item_stats']['max_value'] = item
else:
stats['list_item_stats']['min_value'] = min(stats['list_item_stats']['min_value'], item)
stats['list_item_stats']['max_value'] = max(stats['list_item_stats']['max_value'], item)
elif isinstance(item, str) and len(stats.get('examples', set())) < 3:
if 'examples' not in stats:
stats['examples'] = set()
stats['examples'].add(str(item))
def merge_objects(objects: List[Any], files) -> Dict:
"""Merge multiple JSON objects and analyze their structure."""
stats = init_stats_dict()
stats['total_objects'] = len(objects)
stats['total_files'] = len(files)
for obj in objects:
if obj is not None:
analyze_value(obj, stats)
return stats
def format_value(value: Any) -> str:
"""Format a value for display."""
if isinstance(value, float):
return f"{value:.2f}"
return str(value)
def print_field_stats(stats: Dict, prefix: str = "") -> None:
"""Helper function to print field statistics recursively."""
# Print examples for non-container types
if 'examples' in stats and stats['examples']:
print(f"{prefix}Examples: {', '.join(sorted(stats['examples']))}")
# Print numeric value ranges
if 'min_value' in stats:
min_val = format_value(stats['min_value'])
max_val = format_value(stats['max_value'])
if min_val != max_val:
print(f"{prefix}Value range: {min_val} to {max_val}")
else:
print(f"{prefix}Value: {min_val}")
# Print list properties
if 'list_item_types' in stats:
print(f"{prefix}List properties:")
print(f"{prefix} Length range: {stats['min_length']} to {stats['max_length']}")
item_types = sorted(stats['list_item_types'])
print(f"{prefix} Item types: {', '.join(item_types)}")
# Print list item statistics
if 'list_item_stats' in stats:
item_stats = stats['list_item_stats']
if 'min_value' in item_stats:
min_val = format_value(item_stats['min_value'])
max_val = format_value(item_stats['max_value'])
if min_val != max_val:
print(f"{prefix} Item value range: {min_val} to {max_val}")
else:
print(f"{prefix} Item value: {min_val}")
if 'examples' in item_stats and item_stats['examples']:
print(f"{prefix} Item examples: {', '.join(sorted(item_stats['examples']))}")
if 'fields' in item_stats and item_stats['fields']:
print(f"{prefix} Item structure:")
for field_name, field_stats in sorted(item_stats['fields'].items()):
print(f"{prefix} {field_name}:")
print(f"{prefix} Occurrences: {field_stats['count']}")
print(f"{prefix} Types: {', '.join(sorted(field_stats['types']))}")
print_field_stats(field_stats, prefix + " ")
# Print nested fields from the fields dictionary
if 'fields' in stats and stats['fields']:
print(f"{prefix}Nested structure:")
for field_name, field_stats in sorted(stats['fields'].items()):
print(f"{prefix} {field_name}:")
print(f"{prefix} Occurrences: {field_stats['count']}")
print(f"{prefix} Types: {', '.join(sorted(field_stats['types']))}")
print_field_stats(field_stats, prefix + " ")
def print_summary(stats: Dict) -> None:
"""Print a formatted summary of the JSON structure."""
print("\n=== JSON Structure Summary ===")
print(f"\nTotal objects processed: {stats['total_objects']} (in {stats['total_files']} files)")
print(f"Root level types found: {', '.join(stats['types'])}")
print("\nField Analysis:")
for field, field_stats in sorted(stats['fields'].items()):
print(f"\n{field}:")
print(f" Occurrences: {field_stats['count']}")
print(f" Types: {', '.join(field_stats['types'])}")
print_field_stats(field_stats, " ")
def main():
parser = argparse.ArgumentParser(description='Analyze and merge JSON files')
parser.add_argument('paths', nargs='+', help='Paths to JSON files or directories')
args = parser.parse_args()
# Find all JSON files
json_files = []
for path in args.paths:
json_files.extend(find_json_files(path))
if not json_files:
print("No JSON files found in the specified paths.")
return
# Read and process all JSON files
objects = []
for file_path in json_files:
file_objects = read_json_file(file_path)
objects.extend(file_objects)
if not objects:
print("No valid JSON objects found in the specified files.")
return
# Analyze and print summary
stats = merge_objects(objects, json_files)
print_summary(stats)
if __name__ == '__main__':
main()