util.py: Make new_context dict-compatible

This is an incremental change towards a strongly-typed util module,
aimed at reducing dependency on the DictObject class. The rough idea is
to annotate everything as Dict, add some tests to codify the existing
behavior, and then start defining dataclasses for the dischovery schema.

We also remove some unused logic & params.
This commit is contained in:
Kyle Gentle
2022-08-08 20:08:13 -04:00
parent 08552c4364
commit 8ba6acb88b
8 changed files with 57 additions and 59 deletions

View File

@@ -1,11 +1,12 @@
import re
import os
from random import (randint, random, choice, seed)
from typing import Any, List, Mapping, Optional, Tuple
import collections
from copy import deepcopy
import re
import subprocess
from dataclasses import dataclass
from random import (randint, random, choice, seed)
from typing import Any, Dict, List, Mapping, Tuple
from copy import deepcopy
seed(1337)
re_linestart = re.compile('^', flags=re.MULTILINE)
@@ -494,8 +495,7 @@ def is_schema_with_optionals(schema_markers):
# -------------------------
## @name Activity Utilities
# @{
# return (category, name|None, method)
def activity_split(fqan: str) -> Tuple[str, Optional[str], str]:
def activity_split(fqan: str) -> Tuple[str, str, str]:
t = fqan.split('.')
mt = t[2:]
if not mt:
@@ -663,31 +663,37 @@ It should be used to handle progress information, and to implement a certain lev
## -- End Activity Utilities -- @}
Context = collections.namedtuple('Context', ['sta_map', 'fqan_map', 'rta_map', 'rtc_map', 'schemas'])
@dataclass
class Context:
sta_map: Dict[str, Any]
fqan_map: Dict[str, Any]
rta_map: Dict[str, Any]
rtc_map: Dict[str, Any]
schemas: Dict[str, Any]
# return a newly build context from the given data
def new_context(schemas, resources, methods):
def new_context(schemas: Dict[str, Dict[str, Any]], resources: Dict[str, Any]) -> Context:
# Returns (A, B) where
# A: { SchemaTypeName -> { fqan -> ['request'|'response', ...]}
# B: { fqan -> activity_method_data }
# fqan = fully qualified activity name
def build_activity_mappings(activities, res = None, fqan = None):
def build_activity_mappings(resources: Dict[str, Any], res = None, fqan = None) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if res is None:
res = dict()
if fqan is None:
fqan = dict()
for k,a in activities.items():
for k,a in resources.items():
if 'resources' in a:
build_activity_mappings(a.resources, res, fqan)
build_activity_mappings(a["resources"], res, fqan)
if 'methods' not in a:
continue
for mn, m in a.methods.items():
assert m.id not in fqan
category, resource, method = activity_split(m.id)
for mn, m in a["methods"].items():
assert m["id"] not in fqan
category, resource, method = activity_split(m["id"])
# This may be another name by which people try to find the method.
# As it has no resource, we put in a 'fake resource' (METHODS_RESOURCE), which
# needs some special treatment only in key-spots
fqan_key = m.id
fqan_key = m["id"]
if resource == METHODS_RESOURCE:
fqan_key = to_fqan(category, resource, method)
fqan[fqan_key] = m
@@ -697,7 +703,7 @@ def new_context(schemas, resources, methods):
continue
tn = to_rust_type(schemas, None, None, t, allow_optionals=False)
info = res.setdefault(tn, dict())
io_info = info.setdefault(m.id, [])
io_info = info.setdefault(m["id"], [])
io_info.append(in_out_type_name)
# end for each io type
@@ -707,8 +713,8 @@ def new_context(schemas, resources, methods):
# the latter is used to deduce the resource name
tn = activity_name_to_type_name(resource)
info = res.setdefault(tn, dict())
if m.id not in info:
info.setdefault(m.id, [])
if m["id"] not in info:
info.setdefault(m["id"], [])
# end handle other cases
# end for each method
# end for each activity
@@ -717,7 +723,7 @@ def new_context(schemas, resources, methods):
# A dict of {s.id -> schema} , with all schemas having the 'parents' key set with [s.id, ...] of all parents
# in order of traversal, [-1] is first parent, [0] is the root of them all
def build_schema_map():
def build_schema_map() -> Dict[str, Any]:
# 'type' in t and t.type == 'object' and 'properties' in t or ('items' in t and 'properties' in t.items)
PARENT = 'parents'
USED_BY = 'used_by'
@@ -729,8 +735,8 @@ def new_context(schemas, resources, methods):
def link_used(s, rs):
if TREF in s:
l = assure_list(all_schemas[s[TREF]], USED_BY)
if rs.id not in l:
l.append(rs.id)
if rs["id"] not in l:
l.append(rs["id"])
def append_unique(l, s):
if s not in l:
@@ -738,14 +744,14 @@ def new_context(schemas, resources, methods):
return l
all_schemas = deepcopy(schemas)
def recurse_properties(prefix, rs, s, parent_ids):
def recurse_properties(prefix: str, rs: Any, s: Any, parent_ids: List[str]):
assure_list(s, USED_BY)
assure_list(s, PARENT).extend(parent_ids)
link_used(s, rs)
if is_nested_type_property(s) and 'id' not in s:
s.id = prefix
all_schemas[s.id] = s
all_schemas[s["id"]] = s
rs = s
# end this is already a perfectly valid type
@@ -756,54 +762,46 @@ def new_context(schemas, resources, methods):
if is_nested_type_property(p):
ns = deepcopy(p)
ns.id = _assure_unique_type_name(schemas, nested_type_name(prefix, pn))
all_schemas[ns.id] = ns
all_schemas[ns["id"]] = ns
# To allow us recursing arrays, we simply put items one level up
if 'items' in p:
ns.update((k, deepcopy(v)) for k, v in p.items.items())
ns.update((k, deepcopy(v)) for k, v in p["items"].items())
recurse_properties(ns.id, ns, ns, append_unique(parent_ids, rs.id))
recurse_properties(ns.id, ns, ns, append_unique(parent_ids, rs["id"]))
elif is_map_prop(p):
recurse_properties(nested_type_name(prefix, pn), rs,
p.additionalProperties, append_unique(parent_ids, rs.id))
p["additionalProperties"], append_unique(parent_ids, rs["id"]))
elif 'items' in p:
recurse_properties(nested_type_name(prefix, pn), rs,
p.items, append_unique(parent_ids, rs.id))
elif 'variant' in p:
for enum in p.variant.map:
recurse_properties(prefix, rs, enum, parent_ids)
p["items"], append_unique(parent_ids, rs["id"]))
# end handle prop itself
# end for each property
# end utility
for s in all_schemas.values():
recurse_properties(s.id, s, s, [])
recurse_properties(s["id"], s, s, [])
# end for each schema
return all_schemas
# end utility
all_schemas = schemas and build_schema_map() or dict()
if not (resources or methods):
if not resources:
return Context(dict(), dict(), dict(), dict(), all_schemas)
rta_map, rtc_map, sta_map, fqan_map = dict(), dict(), dict(), dict()
rta_map: Dict[str, Any] = {}
rtc_map: Dict[str, Any] = {}
sta_map: Dict[str, Any] = {}
fqan_map: Dict[str, Any] = {}
sources = list()
if bool(resources):
sources.append(resources)
if bool(methods):
sources.append({None : type(methods)({'methods' : methods})})
for data_source in sources:
_sta_map, _fqan_map = build_activity_mappings(data_source)
for an in _fqan_map:
category, resource, activity = activity_split(an)
rta_map.setdefault(resource, list()).append(activity)
assert rtc_map.setdefault(resource, category) == category
# end for each fqan
sta_map.update(_sta_map)
fqan_map.update(_fqan_map)
# end for each data source
_sta_map, _fqan_map = build_activity_mappings(resources)
for an in _fqan_map:
category, resource, activity = activity_split(an)
rta_map.setdefault(resource, list()).append(activity)
assert rtc_map.setdefault(resource, category) == category
# end for each fqan
sta_map.update(_sta_map)
fqan_map.update(_fqan_map)
return Context(sta_map, fqan_map, rta_map, rtc_map, all_schemas)
def _is_special_version(v):

View File

@@ -1,6 +1,6 @@
<%
from generator.lib.util import (markdown_comment, new_context)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
%>\
<%namespace name="lib" file="lib/lib.mako"/>\
<%namespace name="util" file="../../lib/util.mako"/>\

View File

@@ -9,7 +9,7 @@
rb_type_params_s, find_fattest_resource, HUB_TYPE_PARAMETERS, METHODS_RESOURCE,
UNUSED_TYPE_MARKER, schema_markers)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
hub_type = hub_type(c.schemas, util.canonical_name())
ht_params = hub_type_params_s()

View File

@@ -3,7 +3,7 @@
<%
from generator.lib.util import (new_context, rust_comment, rust_module_doc_comment)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
%>\
<%block filter="rust_comment">\
<%util:gen_info source="${self.uri}" />\
@@ -30,7 +30,7 @@ ${lib.docs(c)}
rb_type_params_s, find_fattest_resource, HUB_TYPE_PARAMETERS, METHODS_RESOURCE,
UNUSED_TYPE_MARKER, schema_markers)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
hub_type = hub_type(c.schemas, util.canonical_name())
ht_params = hub_type_params_s()

View File

@@ -2,7 +2,7 @@
from generator.lib.util import (markdown_comment, new_context)
from generator.lib.cli import (CONFIG_DIR, CONFIG_DIR_FLAG, SCOPE_FLAG, application_secret_path, DEBUG_FLAG)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
%>\
<%namespace name="util" file="../../lib/util.mako"/>\
<%namespace name="argparse" file="lib/argparse.mako"/>\

View File

@@ -15,7 +15,7 @@
NO_DESC = 'No description provided.'
%>\
<%
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
%>\
% for resource in sorted(c.rta_map.keys()):
% for method in sorted(c.rta_map[resource]):

View File

@@ -6,7 +6,7 @@
indent_all_but_first_by)
from generator.lib.cli import OUT_ARG, DEBUG_FLAG, opt_value
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
default_user_agent = "google-cli-rust-client/" + cargo.build_version
%>\
<%block filter="rust_comment">\

View File

@@ -2,7 +2,7 @@
from generator.lib.util import (put_and, new_context)
from generator.lib.cli import (subcommand_md_filename, mangle_subcommand, pretty)
c = new_context(schemas, resources, context.get('methods'))
c = new_context(schemas, resources)
%>\
<%namespace name="util" file="../../lib/util.mako"/>\
site_name: ${util.canonical_name()} v${util.crate_version()}