Skip to content

pyMogwai API Documentation

core

hd_index

Created on 2024-11-07

@author: wf

base on A. Harth and S. Decker, "Optimized index structures for querying RDF from the Web," Third Latin American Web Congress (LA-WEB'2005), Buenos Aires, Argentina, 2005, pp. 10 pp.-, doi: 10.1109/LAWEB.2005.25. keywords: {Resource description framework;Data models;Semantic Web;Indexes;Java;Vocabulary;Database systems;Memory;Indexing;Information retrieval},

Index

A Single index in the SPOG matrix as explained in identified by from/to positions

Source code in mogwai/core/hd_index.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class Index:
    """A Single index in the SPOG matrix as explained in
    identified by from/to positions"""

    def __init__(self, from_pos: str, to_pos: str):
        """
        Args:
            from_pos: First position (S,P,O,G)
            to_pos: Second position (S,P,O,G)
        """
        self.from_pos = from_pos
        self.to_pos = to_pos
        self.lookup = {}

    @property
    def name(self) -> str:
        """Full quad index name based on Harth/Decker SPOG ordering"""
        index_name = f"{self.from_pos}{self.to_pos}"
        return index_name

    def add_quad(self, quad: Quad) -> None:
        """Add a quad to this index's lookup using quad positions"""
        from_val = getattr(quad, self.from_pos.lower())
        to_val = getattr(quad, self.to_pos.lower())
        if not isinstance(from_val, Hashable):
            pass

        if from_val not in self.lookup:
            self.lookup[from_val] = set()
        self.lookup[from_val].add(to_val)
name: str property

Full quad index name based on Harth/Decker SPOG ordering

__init__(from_pos, to_pos)

Parameters:

Name Type Description Default
from_pos str

First position (S,P,O,G)

required
to_pos str

Second position (S,P,O,G)

required
Source code in mogwai/core/hd_index.py
78
79
80
81
82
83
84
85
86
def __init__(self, from_pos: str, to_pos: str):
    """
    Args:
        from_pos: First position (S,P,O,G)
        to_pos: Second position (S,P,O,G)
    """
    self.from_pos = from_pos
    self.to_pos = to_pos
    self.lookup = {}
add_quad(quad)

Add a quad to this index's lookup using quad positions

Source code in mogwai/core/hd_index.py
 94
 95
 96
 97
 98
 99
100
101
102
103
def add_quad(self, quad: Quad) -> None:
    """Add a quad to this index's lookup using quad positions"""
    from_val = getattr(quad, self.from_pos.lower())
    to_val = getattr(quad, self.to_pos.lower())
    if not isinstance(from_val, Hashable):
        pass

    if from_val not in self.lookup:
        self.lookup[from_val] = set()
    self.lookup[from_val].add(to_val)

IndexConfig dataclass

Configuration of which SPOG indices to use

Source code in mogwai/core/hd_index.py
16
17
18
19
20
@dataclass
class IndexConfig:
    """Configuration of which SPOG indices to use"""

    active_indices: Set[str]

IndexConfigs

Bases: Enum

Standard index configurations

Source code in mogwai/core/hd_index.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class IndexConfigs(Enum):
    """Standard index configurations"""

    OFF = "off"  # Use no indices
    ALL = "all"  # Use all 16 indices
    MINIMAL = "minimal"  # Use minimal required set

    def get_config(self) -> IndexConfig:
        """Get the index configuration for this enum value"""
        if self == IndexConfigs.OFF:
            return IndexConfig(set())

        if self == IndexConfigs.ALL:
            positions = ["S", "P", "O", "G"]
            indices = {
                f"{from_pos}{to_pos}"
                for from_pos in positions
                for to_pos in positions
                if from_pos != to_pos
            }
            return IndexConfig(indices)

        if self == IndexConfigs.MINIMAL:
            return IndexConfig(
                {
                    # Core indices for basic node relationships
                    "PS",  # Predicate -> Subject: links predicates to subjects (e.g., labels or properties to nodes)
                    "PO",  # Predicate -> Object: maps predicates to values (e.g., property values)
                    "SO",  # Subject -> Object: links source nodes to target nodes in relationships
                    "OS",  # Object -> Subject: reverse lookup for values back to nodes
                    # Graph-based indices for context-specific associations
                    "PG",  # Predicate -> Graph: associates predicates with graph contexts
                    "SG",  # Subject -> Graph: associates subjects with graph contexts
                    "GO",  # Graph -> Object: maps graph contexts to objects for grouped retrieval
                    "GP",  # Graph -> Predicate: links graph contexts to predicates
                }
            )

        raise ValueError(f"Unknown index configuration: {self}")
get_config()

Get the index configuration for this enum value

Source code in mogwai/core/hd_index.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_config(self) -> IndexConfig:
    """Get the index configuration for this enum value"""
    if self == IndexConfigs.OFF:
        return IndexConfig(set())

    if self == IndexConfigs.ALL:
        positions = ["S", "P", "O", "G"]
        indices = {
            f"{from_pos}{to_pos}"
            for from_pos in positions
            for to_pos in positions
            if from_pos != to_pos
        }
        return IndexConfig(indices)

    if self == IndexConfigs.MINIMAL:
        return IndexConfig(
            {
                # Core indices for basic node relationships
                "PS",  # Predicate -> Subject: links predicates to subjects (e.g., labels or properties to nodes)
                "PO",  # Predicate -> Object: maps predicates to values (e.g., property values)
                "SO",  # Subject -> Object: links source nodes to target nodes in relationships
                "OS",  # Object -> Subject: reverse lookup for values back to nodes
                # Graph-based indices for context-specific associations
                "PG",  # Predicate -> Graph: associates predicates with graph contexts
                "SG",  # Subject -> Graph: associates subjects with graph contexts
                "GO",  # Graph -> Object: maps graph contexts to objects for grouped retrieval
                "GP",  # Graph -> Predicate: links graph contexts to predicates
            }
        )

    raise ValueError(f"Unknown index configuration: {self}")

Quad dataclass

A quad of hashable values (Subject-Predicate-Object-Graph)

Source code in mogwai/core/hd_index.py
64
65
66
67
68
69
70
71
@dataclass(frozen=True)
class Quad:
    """A quad of hashable values (Subject-Predicate-Object-Graph)"""

    s: Hashable  # Subject
    p: Hashable  # Predicate
    o: Hashable  # Object
    g: Hashable | None = None  # Graph context

SPOGIndex

all 16 possible indices based on SPOG matrix

see http://harth.org/andreas/ YARS and the paper

Source code in mogwai/core/hd_index.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class SPOGIndex:
    """
    all 16 possible indices based on SPOG matrix

    see http://harth.org/andreas/ YARS and the paper
    """

    def __init__(self, config: IndexConfig):
        self.config = config
        positions = ["S", "P", "O", "G"]
        self.indices = {}
        self.indices = {}
        for from_pos in positions:
            for to_pos in positions:
                if from_pos != to_pos:
                    index = Index(from_pos, to_pos)
                    self.indices[index.name] = index

    def get_lookup(self, from_pos: str, to_pos: str) -> dict | None:
        """
        Get lookup dict for from->to positions if active

        Args:
            from_pos: From position (S,P,O,G)
            to_pos: To position (S,P,O,G)
        Returns:
            Lookup dict if index active in current config, None otherwise
        """
        index_name = f"{from_pos}{to_pos}"
        if index_name in self.config.active_indices:
            return self.indices[index_name].lookup
        return None

    def add_quad(self, quad: Quad) -> None:
        """Add quad only to configured active indices"""
        for index_name in self.config.active_indices:
            self.indices[index_name].add_quad(quad)
add_quad(quad)

Add quad only to configured active indices

Source code in mogwai/core/hd_index.py
139
140
141
142
def add_quad(self, quad: Quad) -> None:
    """Add quad only to configured active indices"""
    for index_name in self.config.active_indices:
        self.indices[index_name].add_quad(quad)
get_lookup(from_pos, to_pos)

Get lookup dict for from->to positions if active

Parameters:

Name Type Description Default
from_pos str

From position (S,P,O,G)

required
to_pos str

To position (S,P,O,G)

required

Returns: Lookup dict if index active in current config, None otherwise

Source code in mogwai/core/hd_index.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def get_lookup(self, from_pos: str, to_pos: str) -> dict | None:
    """
    Get lookup dict for from->to positions if active

    Args:
        from_pos: From position (S,P,O,G)
        to_pos: To position (S,P,O,G)
    Returns:
        Lookup dict if index active in current config, None otherwise
    """
    index_name = f"{from_pos}{to_pos}"
    if index_name in self.config.active_indices:
        return self.indices[index_name].lookup
    return None

mogwaigraph

MogwaiGraph

Bases: DiGraph

networkx based directed graph see https://networkx.org/documentation/stable/reference/classes/digraph.html

Source code in mogwai/core/mogwaigraph.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
class MogwaiGraph(networkx.DiGraph):
    """
    networkx based directed graph
    see https://networkx.org/documentation/stable/reference/classes/digraph.html
    """

    def __init__(
        self, incoming_graph_data=None, config: MogwaiGraphConfig = None, **attr
    ):
        """Initialize a MogwaiGraph with optional data and configuration.

        Args:
            incoming_graph_data: Graph data in NetworkX compatible format
            config (MogwaiGraphConfig): Configuration for field names and defaults
            **attr: Graph attributes as key=value pairs
        """
        super().__init__(incoming_graph_data, **attr)
        self.counter = 0
        self.config = config or MogwaiGraphConfig()
        # Initialize SPOG index based on config
        index_config = IndexConfigs[self.config.index_config.upper()].get_config()
        self.spog_index = SPOGIndex(index_config)

    def get_next_node_id(self) -> str:
        """
        get the next node_id
        """
        node_id = self.counter
        self.counter += 1
        node_id_str = str(node_id)
        return node_id_str

    def add_to_index(
        self,
        element_type: str,
        subject_id: Hashable,
        label: str,
        name: str,
        properties: dict,
    ):
        """
        Add labels, name, and properties to the SPOG index for a
        given subject and element_type

        Args:
            element_type: (str): node or edge
            subject_id (Hashable): The ID of the subject (node or edge).
            label (str): the label for the subject.
            name (str): Name of the subject.
            properties (dict): Dictionary of additional properties to index.
        """
        # only index if the config calls for it
        if self.config.index_config == "off":
            return
        # Add quads for label with g="label"
        label_quad = Quad(s=subject_id, p="label", o=label, g=f"{element_type}-label")
        self.spog_index.add_quad(label_quad)

        # Add quad for name with g="name"
        name_quad = Quad(s=subject_id, p="name", o=name, g=f"{element_type}-name")
        self.spog_index.add_quad(name_quad)

        # Add quads for each property with g="property"
        for prop_name, prop_value in properties.items():
            if not isinstance(prop_value, Hashable):
                prop_value = str(prop_value)  # Ensure property value is hashable
            property_quad = Quad(
                s=subject_id, p=prop_name, o=prop_value, g=f"{element_type}-property"
            )
            self.spog_index.add_quad(property_quad)

    def add_labeled_node(
        self,
        label: str,
        name: str,
        properties: dict = None,
        node_id: Optional[str] = None,
        **kwargs,
    ) -> Any:
        """
        Add a labeled node to the graph.

        we can only insert a node by hashable value and as names and ids
        might occur multiple times we use incremented node ids if no node_id is provided

        Args:
            label (str): The label for the node.
            name (str): The name of the node.
            properties (dict, optional): Additional properties for the node. Defaults to None.
            node_id (Optional[int], optional): The ID for the node. If not provided, a new ID will be generated. Defaults to None.
            kwargs (): further property values
        Returns:
            Any: The ID of the newly added node - will be an integer if node_id was kept as default None

        Raises:
            MogwaiGraphError: If a node with the provided ID already exists in the graph.
        """
        if node_id is None:
            node_id = self.get_next_node_id()
        properties = properties or {}
        properties.update(kwargs)
        if self.config.name_field in properties:
            raise MogwaiGraphError(
                f"The '{self.config.name_field}' property is reserved for the node name."
            )
        elif self.config.label_field in properties:
            raise MogwaiGraphError(
                f"The '{self.config.label_field}' property is reserved for the node labels."
            )
        node_props = {
            self.config.name_field: name,
            self.config.label_field: label,
            **properties,
        }
        super().add_node(node_id, **node_props)
        # Use add_to_index to add label, name, and properties as quads
        self.add_to_index("node", node_id, label, name, properties)
        return node_id

    def add_labeled_edge(
        self, srcId: int, destId: int, edgeLabel: str, properties: dict = None, **kwargs
    ):
        """
        add a labeled edge
        """
        if self.has_node(srcId) and self.has_node(destId):
            properties = properties or {}
            properties.update(kwargs)
            if self.config.edge_label_field in properties:
                raise MogwaiGraphError(
                    f"The '{self.config.edge_label_field}' property is reserved for the edge label."
                )
            elif self.config.label_field in properties:
                raise MogwaiGraphError(
                    f"The '{self.config.label_field}' property is reserved for the node labels."
                )
            edge_props = {self.config.edge_label_field: edgeLabel, **properties}
            super().add_edge(srcId, destId, **edge_props)
            # Add a quad specifically for the edge connection
            edge_quad = Quad(s=srcId, p=edgeLabel, o=destId, g="edge-link")
            self.spog_index.add_quad(edge_quad)

            # Use add_to_index to add label, name, and properties as quads
            self.add_to_index("edge", srcId, edgeLabel, edgeLabel, properties)
        else:
            raise MogwaiGraphError(
                f"Node with srcId {srcId} or destId {destId} is not in the graph."
            )

    def add_node(self, *args, **kwargs):
        """Add a node with default or explicit labels"""
        if len(args) > 0:
            node_id = args[0]
        else:
            node_id = self.get_next_node_id()

        label = kwargs.pop("labels", self.config.default_node_label)
        name = kwargs.pop("name", str(node_id))
        return self.add_labeled_node(label, name, properties=kwargs, node_id=node_id)

    def add_edge(self, *args, **kwargs):
        """Add an edge with default or explicit label"""
        if len(args) < 2:
            raise MogwaiGraphError("add_edge() requires source and target node ids")
        src, dst = args[0:2]
        label = kwargs.pop(self.config.edge_label_field, self.config.default_edge_label)
        return self.add_labeled_edge(src, dst, label, properties=kwargs)

    def _get_nodes_set(self, label: set, name: str):
        n_none = name is None
        if n_none:
            return [n for n in self.nodes(date=True) if label.issubset(n[1]["labels"])]
        if not n_none:
            return [
                n
                for n in self.nodes(data=True)
                if label.issubset(n[1]["labels"]) and n[1]["name"] == name
            ]
        return self.nodes

    def get_nodes(self, label: str, name: str):
        """
        @FIXME - this is ugly code
        """
        l_none, n_none = label is None, name is None
        if not l_none and not n_none:
            return [
                n
                for n in self.nodes(data=True)
                if label in n[1]["labels"] and n[1]["name"] == name
            ]
        if l_none and not n_none:
            return [n for n in self.nodes(data=True) if n[1]["name"] == name]
        if not l_none and n_none:
            return [n for n in self.nodes(date=True) if label in n[1]["labels"]]
        return self.nodes

    def merge(
        self, other: "MogwaiGraph", srcId: int, targetId: int, edgeLabel: str
    ):
        mapping = {k: self.get_next_node_id() for k in other.nodes}
        relabeled = networkx.relabel_nodes(other, mapping, copy=True)
        self.add_nodes_from(relabeled.nodes(data=True))
        self.add_edges_from(relabeled.edges(data=True))
        self.add_labeled_edge(
            srcId=srcId, destId=mapping[targetId], edgeLabel=edgeLabel
        )

    def join(
        self,
        from_label: str,
        to_label: str,
        join_field: str,
        target_key: str,
        edge_label: str,
    ):
        """Joins two node types by field values and creates edges between them."""
        node_lookup = self.spog_index.get_lookup("P", "O")
        if not node_lookup:
            raise ValueError("No SPOG index available")

        field_values = node_lookup.get(join_field)
        if field_values is None:
            raise ValueError(f"Join field {join_field} not found in index")

        target_values = node_lookup.get(target_key)
        if target_values is None:
            raise ValueError(f"Target key {target_key} not found in index")

        os_lookup = self.spog_index.get_lookup("O", "S")
        for source_id in os_lookup.get(from_label):
            source_value = self.nodes[source_id][join_field]
            target_ids = os_lookup.get(source_value)
            for target_id in target_ids:
                if to_label in self.nodes[target_id].get("labels", []):
                    self.add_labeled_edge(source_id, target_id, edge_label)

    def draw(self, outputfile, title: str = "MogwaiGraph", **kwargs):
        """
        Draw the graph using graphviz
        Parameters
        ----------
        outputfile : str
            the file to save the graph to
        title : str, default 'MogwaiGraph'
            the title of the graph
        kwargs : dict
            additional parameters used to configure the drawing style.
            For more details see `MogwaiGraphDrawer`
        """
        MogwaiGraphDrawer(self, title=title, **kwargs).draw(outputfile)

    @classmethod
    def modern(cls, index_config="off") -> "MogwaiGraph":
        """
        create the modern graph
        see https://tinkerpop.apache.org/docs/current/tutorials/getting-started/
        """
        config = MogwaiGraphConfig
        config.index_config = index_config
        g = MogwaiGraph(config=config)
        marko = g.add_labeled_node("Person", name="marko", age=29)
        vadas = g.add_labeled_node("Person", name="vadas", age=27)
        lop = g.add_labeled_node("Software", name="lop", lang="java")
        josh = g.add_labeled_node("Person", name="josh", age=32)
        ripple = g.add_labeled_node("Software", name="ripple", lang="java")
        peter = g.add_labeled_node("Person", name="peter", age=35)

        g.add_labeled_edge(marko, vadas, "knows", weight=0.5)
        g.add_labeled_edge(marko, josh, "knows", weight=1.0)
        g.add_labeled_edge(marko, lop, "created", weight=0.4)
        g.add_labeled_edge(josh, ripple, "created", weight=1.0)
        g.add_labeled_edge(josh, lop, "created", weight=0.4)
        g.add_labeled_edge(peter, lop, "created", weight=0.2)
        return g

    @classmethod
    def crew(cls) -> "MogwaiGraph":
        """
        create the TheCrew example graph
        see TinkerFactory.createTheCrew() in https://tinkerpop.apache.org/docs/current/reference/
        """
        g = MogwaiGraph()

        def t(startTime: int, endTime: int = None):
            d = dict()
            d["startTime"] = startTime
            if endTime is not None:
                d["endTime"] = endTime
            return d

        marko = g.add_labeled_node(
            "Person",
            name="marko",
            location={
                "san diego": t(1997, 2001),
                "santa cruz": t(2001, 2004),
                "brussels": t(2004, 2005),
                "santa fe": t(2005),
            },
        )
        stephen = g.add_labeled_node(
            "Person",
            name="stephen",
            location={
                "centreville": t(1990, 2000),
                "dulles": t(2000, 2006),
                "purcellvilee": t(2006),
            },
        )
        matthias = g.add_labeled_node(
            "Person",
            name="matthias",
            location={
                "bremen": t(2004, 2007),
                "baltimore": t(2007, 2011),
                "oakland": t(2011, 2014),
                "seattle": t(2014),
            },
        )
        daniel = g.add_labeled_node(
            "Person",
            name="daniel",
            location={
                "spremberg": t(1982, 2005),
                "kaiserslautern": t(2005, 2009),
                "aachen": t(2009),
            },
        )
        gremlin = g.add_labeled_node("Software", name="gremlin")
        tinkergraph = g.add_labeled_node("Software", name="tinkergraph")

        g.add_labeled_edge(marko, gremlin, "uses", skill=4)
        g.add_labeled_edge(stephen, gremlin, "uses", skill=5)
        g.add_labeled_edge(matthias, gremlin, "uses", skill=3)
        g.add_labeled_edge(daniel, gremlin, "uses", skill=5)
        g.add_labeled_edge(marko, tinkergraph, "uses", skill=5)
        g.add_labeled_edge(stephen, tinkergraph, "uses", skill=4)
        g.add_labeled_edge(matthias, tinkergraph, "uses", skill=3)
        g.add_labeled_edge(daniel, tinkergraph, "uses", skill=3)
        g.add_labeled_edge(gremlin, tinkergraph, "traverses")
        g.add_labeled_edge(marko, tinkergraph, "develops", since=2010)
        g.add_labeled_edge(stephen, tinkergraph, "develops", since=2011)
        g.add_labeled_edge(marko, gremlin, "develops", since=2009)
        g.add_labeled_edge(stephen, gremlin, "develops", since=2010)
        g.add_labeled_edge(matthias, gremlin, "develops", since=2012)
        return g
__init__(incoming_graph_data=None, config=None, **attr)

Initialize a MogwaiGraph with optional data and configuration.

Parameters:

Name Type Description Default
incoming_graph_data

Graph data in NetworkX compatible format

None
config MogwaiGraphConfig

Configuration for field names and defaults

None
**attr

Graph attributes as key=value pairs

{}
Source code in mogwai/core/mogwaigraph.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self, incoming_graph_data=None, config: MogwaiGraphConfig = None, **attr
):
    """Initialize a MogwaiGraph with optional data and configuration.

    Args:
        incoming_graph_data: Graph data in NetworkX compatible format
        config (MogwaiGraphConfig): Configuration for field names and defaults
        **attr: Graph attributes as key=value pairs
    """
    super().__init__(incoming_graph_data, **attr)
    self.counter = 0
    self.config = config or MogwaiGraphConfig()
    # Initialize SPOG index based on config
    index_config = IndexConfigs[self.config.index_config.upper()].get_config()
    self.spog_index = SPOGIndex(index_config)
add_edge(*args, **kwargs)

Add an edge with default or explicit label

Source code in mogwai/core/mogwaigraph.py
186
187
188
189
190
191
192
def add_edge(self, *args, **kwargs):
    """Add an edge with default or explicit label"""
    if len(args) < 2:
        raise MogwaiGraphError("add_edge() requires source and target node ids")
    src, dst = args[0:2]
    label = kwargs.pop(self.config.edge_label_field, self.config.default_edge_label)
    return self.add_labeled_edge(src, dst, label, properties=kwargs)
add_labeled_edge(srcId, destId, edgeLabel, properties=None, **kwargs)

add a labeled edge

Source code in mogwai/core/mogwaigraph.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def add_labeled_edge(
    self, srcId: int, destId: int, edgeLabel: str, properties: dict = None, **kwargs
):
    """
    add a labeled edge
    """
    if self.has_node(srcId) and self.has_node(destId):
        properties = properties or {}
        properties.update(kwargs)
        if self.config.edge_label_field in properties:
            raise MogwaiGraphError(
                f"The '{self.config.edge_label_field}' property is reserved for the edge label."
            )
        elif self.config.label_field in properties:
            raise MogwaiGraphError(
                f"The '{self.config.label_field}' property is reserved for the node labels."
            )
        edge_props = {self.config.edge_label_field: edgeLabel, **properties}
        super().add_edge(srcId, destId, **edge_props)
        # Add a quad specifically for the edge connection
        edge_quad = Quad(s=srcId, p=edgeLabel, o=destId, g="edge-link")
        self.spog_index.add_quad(edge_quad)

        # Use add_to_index to add label, name, and properties as quads
        self.add_to_index("edge", srcId, edgeLabel, edgeLabel, properties)
    else:
        raise MogwaiGraphError(
            f"Node with srcId {srcId} or destId {destId} is not in the graph."
        )
add_labeled_node(label, name, properties=None, node_id=None, **kwargs)

Add a labeled node to the graph.

we can only insert a node by hashable value and as names and ids might occur multiple times we use incremented node ids if no node_id is provided

Parameters:

Name Type Description Default
label str

The label for the node.

required
name str

The name of the node.

required
properties dict

Additional properties for the node. Defaults to None.

None
node_id Optional[int]

The ID for the node. If not provided, a new ID will be generated. Defaults to None.

None
kwargs

further property values

{}

Returns: Any: The ID of the newly added node - will be an integer if node_id was kept as default None

Raises:

Type Description
MogwaiGraphError

If a node with the provided ID already exists in the graph.

Source code in mogwai/core/mogwaigraph.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def add_labeled_node(
    self,
    label: str,
    name: str,
    properties: dict = None,
    node_id: Optional[str] = None,
    **kwargs,
) -> Any:
    """
    Add a labeled node to the graph.

    we can only insert a node by hashable value and as names and ids
    might occur multiple times we use incremented node ids if no node_id is provided

    Args:
        label (str): The label for the node.
        name (str): The name of the node.
        properties (dict, optional): Additional properties for the node. Defaults to None.
        node_id (Optional[int], optional): The ID for the node. If not provided, a new ID will be generated. Defaults to None.
        kwargs (): further property values
    Returns:
        Any: The ID of the newly added node - will be an integer if node_id was kept as default None

    Raises:
        MogwaiGraphError: If a node with the provided ID already exists in the graph.
    """
    if node_id is None:
        node_id = self.get_next_node_id()
    properties = properties or {}
    properties.update(kwargs)
    if self.config.name_field in properties:
        raise MogwaiGraphError(
            f"The '{self.config.name_field}' property is reserved for the node name."
        )
    elif self.config.label_field in properties:
        raise MogwaiGraphError(
            f"The '{self.config.label_field}' property is reserved for the node labels."
        )
    node_props = {
        self.config.name_field: name,
        self.config.label_field: label,
        **properties,
    }
    super().add_node(node_id, **node_props)
    # Use add_to_index to add label, name, and properties as quads
    self.add_to_index("node", node_id, label, name, properties)
    return node_id
add_node(*args, **kwargs)

Add a node with default or explicit labels

Source code in mogwai/core/mogwaigraph.py
175
176
177
178
179
180
181
182
183
184
def add_node(self, *args, **kwargs):
    """Add a node with default or explicit labels"""
    if len(args) > 0:
        node_id = args[0]
    else:
        node_id = self.get_next_node_id()

    label = kwargs.pop("labels", self.config.default_node_label)
    name = kwargs.pop("name", str(node_id))
    return self.add_labeled_node(label, name, properties=kwargs, node_id=node_id)
add_to_index(element_type, subject_id, label, name, properties)

Add labels, name, and properties to the SPOG index for a given subject and element_type

Parameters:

Name Type Description Default
element_type str

(str): node or edge

required
subject_id Hashable

The ID of the subject (node or edge).

required
label str

the label for the subject.

required
name str

Name of the subject.

required
properties dict

Dictionary of additional properties to index.

required
Source code in mogwai/core/mogwaigraph.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def add_to_index(
    self,
    element_type: str,
    subject_id: Hashable,
    label: str,
    name: str,
    properties: dict,
):
    """
    Add labels, name, and properties to the SPOG index for a
    given subject and element_type

    Args:
        element_type: (str): node or edge
        subject_id (Hashable): The ID of the subject (node or edge).
        label (str): the label for the subject.
        name (str): Name of the subject.
        properties (dict): Dictionary of additional properties to index.
    """
    # only index if the config calls for it
    if self.config.index_config == "off":
        return
    # Add quads for label with g="label"
    label_quad = Quad(s=subject_id, p="label", o=label, g=f"{element_type}-label")
    self.spog_index.add_quad(label_quad)

    # Add quad for name with g="name"
    name_quad = Quad(s=subject_id, p="name", o=name, g=f"{element_type}-name")
    self.spog_index.add_quad(name_quad)

    # Add quads for each property with g="property"
    for prop_name, prop_value in properties.items():
        if not isinstance(prop_value, Hashable):
            prop_value = str(prop_value)  # Ensure property value is hashable
        property_quad = Quad(
            s=subject_id, p=prop_name, o=prop_value, g=f"{element_type}-property"
        )
        self.spog_index.add_quad(property_quad)
crew() classmethod

create the TheCrew example graph see TinkerFactory.createTheCrew() in https://tinkerpop.apache.org/docs/current/reference/

Source code in mogwai/core/mogwaigraph.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
@classmethod
def crew(cls) -> "MogwaiGraph":
    """
    create the TheCrew example graph
    see TinkerFactory.createTheCrew() in https://tinkerpop.apache.org/docs/current/reference/
    """
    g = MogwaiGraph()

    def t(startTime: int, endTime: int = None):
        d = dict()
        d["startTime"] = startTime
        if endTime is not None:
            d["endTime"] = endTime
        return d

    marko = g.add_labeled_node(
        "Person",
        name="marko",
        location={
            "san diego": t(1997, 2001),
            "santa cruz": t(2001, 2004),
            "brussels": t(2004, 2005),
            "santa fe": t(2005),
        },
    )
    stephen = g.add_labeled_node(
        "Person",
        name="stephen",
        location={
            "centreville": t(1990, 2000),
            "dulles": t(2000, 2006),
            "purcellvilee": t(2006),
        },
    )
    matthias = g.add_labeled_node(
        "Person",
        name="matthias",
        location={
            "bremen": t(2004, 2007),
            "baltimore": t(2007, 2011),
            "oakland": t(2011, 2014),
            "seattle": t(2014),
        },
    )
    daniel = g.add_labeled_node(
        "Person",
        name="daniel",
        location={
            "spremberg": t(1982, 2005),
            "kaiserslautern": t(2005, 2009),
            "aachen": t(2009),
        },
    )
    gremlin = g.add_labeled_node("Software", name="gremlin")
    tinkergraph = g.add_labeled_node("Software", name="tinkergraph")

    g.add_labeled_edge(marko, gremlin, "uses", skill=4)
    g.add_labeled_edge(stephen, gremlin, "uses", skill=5)
    g.add_labeled_edge(matthias, gremlin, "uses", skill=3)
    g.add_labeled_edge(daniel, gremlin, "uses", skill=5)
    g.add_labeled_edge(marko, tinkergraph, "uses", skill=5)
    g.add_labeled_edge(stephen, tinkergraph, "uses", skill=4)
    g.add_labeled_edge(matthias, tinkergraph, "uses", skill=3)
    g.add_labeled_edge(daniel, tinkergraph, "uses", skill=3)
    g.add_labeled_edge(gremlin, tinkergraph, "traverses")
    g.add_labeled_edge(marko, tinkergraph, "develops", since=2010)
    g.add_labeled_edge(stephen, tinkergraph, "develops", since=2011)
    g.add_labeled_edge(marko, gremlin, "develops", since=2009)
    g.add_labeled_edge(stephen, gremlin, "develops", since=2010)
    g.add_labeled_edge(matthias, gremlin, "develops", since=2012)
    return g
draw(outputfile, title='MogwaiGraph', **kwargs)

Draw the graph using graphviz Parameters


outputfile : str the file to save the graph to title : str, default 'MogwaiGraph' the title of the graph kwargs : dict additional parameters used to configure the drawing style. For more details see MogwaiGraphDrawer

Source code in mogwai/core/mogwaigraph.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def draw(self, outputfile, title: str = "MogwaiGraph", **kwargs):
    """
    Draw the graph using graphviz
    Parameters
    ----------
    outputfile : str
        the file to save the graph to
    title : str, default 'MogwaiGraph'
        the title of the graph
    kwargs : dict
        additional parameters used to configure the drawing style.
        For more details see `MogwaiGraphDrawer`
    """
    MogwaiGraphDrawer(self, title=title, **kwargs).draw(outputfile)
get_next_node_id()

get the next node_id

Source code in mogwai/core/mogwaigraph.py
49
50
51
52
53
54
55
56
def get_next_node_id(self) -> str:
    """
    get the next node_id
    """
    node_id = self.counter
    self.counter += 1
    node_id_str = str(node_id)
    return node_id_str
get_nodes(label, name)

@FIXME - this is ugly code

Source code in mogwai/core/mogwaigraph.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def get_nodes(self, label: str, name: str):
    """
    @FIXME - this is ugly code
    """
    l_none, n_none = label is None, name is None
    if not l_none and not n_none:
        return [
            n
            for n in self.nodes(data=True)
            if label in n[1]["labels"] and n[1]["name"] == name
        ]
    if l_none and not n_none:
        return [n for n in self.nodes(data=True) if n[1]["name"] == name]
    if not l_none and n_none:
        return [n for n in self.nodes(date=True) if label in n[1]["labels"]]
    return self.nodes
join(from_label, to_label, join_field, target_key, edge_label)

Joins two node types by field values and creates edges between them.

Source code in mogwai/core/mogwaigraph.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def join(
    self,
    from_label: str,
    to_label: str,
    join_field: str,
    target_key: str,
    edge_label: str,
):
    """Joins two node types by field values and creates edges between them."""
    node_lookup = self.spog_index.get_lookup("P", "O")
    if not node_lookup:
        raise ValueError("No SPOG index available")

    field_values = node_lookup.get(join_field)
    if field_values is None:
        raise ValueError(f"Join field {join_field} not found in index")

    target_values = node_lookup.get(target_key)
    if target_values is None:
        raise ValueError(f"Target key {target_key} not found in index")

    os_lookup = self.spog_index.get_lookup("O", "S")
    for source_id in os_lookup.get(from_label):
        source_value = self.nodes[source_id][join_field]
        target_ids = os_lookup.get(source_value)
        for target_id in target_ids:
            if to_label in self.nodes[target_id].get("labels", []):
                self.add_labeled_edge(source_id, target_id, edge_label)
modern(index_config='off') classmethod

create the modern graph see https://tinkerpop.apache.org/docs/current/tutorials/getting-started/

Source code in mogwai/core/mogwaigraph.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@classmethod
def modern(cls, index_config="off") -> "MogwaiGraph":
    """
    create the modern graph
    see https://tinkerpop.apache.org/docs/current/tutorials/getting-started/
    """
    config = MogwaiGraphConfig
    config.index_config = index_config
    g = MogwaiGraph(config=config)
    marko = g.add_labeled_node("Person", name="marko", age=29)
    vadas = g.add_labeled_node("Person", name="vadas", age=27)
    lop = g.add_labeled_node("Software", name="lop", lang="java")
    josh = g.add_labeled_node("Person", name="josh", age=32)
    ripple = g.add_labeled_node("Software", name="ripple", lang="java")
    peter = g.add_labeled_node("Person", name="peter", age=35)

    g.add_labeled_edge(marko, vadas, "knows", weight=0.5)
    g.add_labeled_edge(marko, josh, "knows", weight=1.0)
    g.add_labeled_edge(marko, lop, "created", weight=0.4)
    g.add_labeled_edge(josh, ripple, "created", weight=1.0)
    g.add_labeled_edge(josh, lop, "created", weight=0.4)
    g.add_labeled_edge(peter, lop, "created", weight=0.2)
    return g

MogwaiGraphConfig dataclass

configuration of a MogwaiGraph

Source code in mogwai/core/mogwaigraph.py
11
12
13
14
15
16
17
18
19
20
21
22
23
@dataclass
class MogwaiGraphConfig:
    """
    configuration of a MogwaiGraph
    """

    name_field: str = "name"
    label_field: str = "labels"
    edge_label_field: str = "labels"
    default_node_label: str = "Node"
    default_edge_label: str = "Edge"
    index_config: str = "off"
    single_label: bool = True

MogwaiGraphDrawer

helper class to draw MogwaiGraphs

Source code in mogwai/core/mogwaigraph.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class MogwaiGraphDrawer:
    """
    helper class to draw MogwaiGraphs
    """

    def __init__(self, g: MogwaiGraph, title: str, **kwargs):
        """
        Parameters
        ----------
        g : MogwaiGraph
            the graph to draw
        title : str
            the title of the graph
        kwargs : dict
            additional parameters used to configure the drawing style
            * *fontname* : str, default 'arial'
                the font to use
            * *fillcolor* : str, default '#ADE1FE'
                the fill color of the vertices
            * *edge_line_width* : int, default 3
                the width of the edges
            * *dash_width* : int, default 5
                number of dashess in the head/properties delimiter
            * *v_limit* : int, default 10
                the maximum number of vertices to show
            * *e_limit* : int, default 10
                the maximum number of edges to show
            * *vertex_properties* : list, default None
                the properties to display for vertices, if `None` all properties are shown
            * *edge_properties* : list, default None
                the properties to display for edges, if `None` all properties are shown
            * *prog* : str, default 'dot'
                the layout program to use
        """
        self.g = g
        self.title = title
        self.config = kwargs or {}
        self.vertex_keys = self.config.get("vertex_properties", None)
        self.edge_keys = self.config.get("edge_properties", None)

        self.v_drawn = set()
        self.e_drawn = set()

    def _draw_vertex(self, n):
        if len(self.v_drawn) >= self.config.get("v_limit", 10):
            return False
        if n[0] in self.v_drawn:
            return None
        id, properties = n
        head = f"{id}, {properties.pop('name')}\n{', '.join(properties.pop('labels'))}"
        if self.vertex_keys:
            properties = {k: v for k, v in properties.items() if k in self.vertex_keys}
        body = "\n".join([f"{k}: {v}" for k, v in properties.items()])
        label = f"{head}\n" + ("-" * self.config.get("dash_width", 5)) + f"\n{body}"

        self.gviz.add_node(
            id,
            label=label,
            fillcolor=self.config.get("fillcolor", "#ADE1FE"),
            style="filled",
            fontname=self.config.get("fontname", "arial"),
        )
        self.v_drawn.add(id)
        return True

    def _draw_edge(self, e, with_vertices: bool = True):
        if len(self.e_drawn) > self.config.get("e_limit", 10):
            return False
        if e[:-1] in self.e_drawn:
            return None
        if with_vertices:
            self._draw_vertex((e[0], self.g.nodes[e[0]]))
            self._draw_vertex((e[1], self.g.nodes[e[1]]))
        head = f"{e[2].pop('labels')}"
        body = "\n".join([f"{k}: {v}" for k, v in e[2].items()])
        label = f"{head}\n" + ("-" * self.config.get("dash_width", 5)) + f"\n{body}"

        self.gviz.add_edge(
            e[0],
            e[1],
            label=label,
            style=f"setlinewidth({self.config.get('edge_line_width', 3)})",
            fontname=self.config.get("fontname", "arial"),
        )
        self.e_drawn.add(e[:-1])

    def draw(self, outputfile: str):
        """
        draw the given graphviz markup from the given output file using
        the graphviz "dot" software
        """
        try:
            import pygraphviz
        except ImportError:
            raise ImportError("Please install pygraphviz to draw graphs.")

        self.gviz: pygraphviz.AGraph = networkx.nx_agraph.to_agraph(self.g)
        for n in self.g.nodes(data=True):
            if self._draw_vertex(n) == False:
                break
        for e in self.g.edges(data=True):
            if self._draw_edge(e) == False:
                break
        self.gviz.layout(prog=self.config.get("prog", "dot"))
        self.gviz.draw(outputfile)
__init__(g, title, **kwargs)
Parameters

g : MogwaiGraph the graph to draw title : str the title of the graph kwargs : dict additional parameters used to configure the drawing style * fontname : str, default 'arial' the font to use * fillcolor : str, default '#ADE1FE' the fill color of the vertices * edge_line_width : int, default 3 the width of the edges * dash_width : int, default 5 number of dashess in the head/properties delimiter * v_limit : int, default 10 the maximum number of vertices to show * e_limit : int, default 10 the maximum number of edges to show * vertex_properties : list, default None the properties to display for vertices, if None all properties are shown * edge_properties : list, default None the properties to display for edges, if None all properties are shown * prog : str, default 'dot' the layout program to use

Source code in mogwai/core/mogwaigraph.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def __init__(self, g: MogwaiGraph, title: str, **kwargs):
    """
    Parameters
    ----------
    g : MogwaiGraph
        the graph to draw
    title : str
        the title of the graph
    kwargs : dict
        additional parameters used to configure the drawing style
        * *fontname* : str, default 'arial'
            the font to use
        * *fillcolor* : str, default '#ADE1FE'
            the fill color of the vertices
        * *edge_line_width* : int, default 3
            the width of the edges
        * *dash_width* : int, default 5
            number of dashess in the head/properties delimiter
        * *v_limit* : int, default 10
            the maximum number of vertices to show
        * *e_limit* : int, default 10
            the maximum number of edges to show
        * *vertex_properties* : list, default None
            the properties to display for vertices, if `None` all properties are shown
        * *edge_properties* : list, default None
            the properties to display for edges, if `None` all properties are shown
        * *prog* : str, default 'dot'
            the layout program to use
    """
    self.g = g
    self.title = title
    self.config = kwargs or {}
    self.vertex_keys = self.config.get("vertex_properties", None)
    self.edge_keys = self.config.get("edge_properties", None)

    self.v_drawn = set()
    self.e_drawn = set()
draw(outputfile)

draw the given graphviz markup from the given output file using the graphviz "dot" software

Source code in mogwai/core/mogwaigraph.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def draw(self, outputfile: str):
    """
    draw the given graphviz markup from the given output file using
    the graphviz "dot" software
    """
    try:
        import pygraphviz
    except ImportError:
        raise ImportError("Please install pygraphviz to draw graphs.")

    self.gviz: pygraphviz.AGraph = networkx.nx_agraph.to_agraph(self.g)
    for n in self.g.nodes(data=True):
        if self._draw_vertex(n) == False:
            break
    for e in self.g.edges(data=True):
        if self._draw_edge(e) == False:
            break
    self.gviz.layout(prog=self.config.get("prog", "dot"))
    self.gviz.draw(outputfile)

steps

base_steps

branch_steps

filter_steps

HasWithin

Bases: FilterStep

Similar to Has, but with multiple options for the value

Source code in mogwai/core/steps/filter_steps.py
80
81
82
83
84
85
86
87
88
89
class HasWithin(FilterStep):
    """
    Similar to `Has`, but with multiple options for the value
    """
    def __init__(self, traversal:Traversal, key:str|List[str], valueOptions:List|Tuple):
        super().__init__(traversal)
        self.key = key
        self.valueOptions = valueOptions
        indexer = (lambda t: t.get) if key=="id" else tu.get_dict_indexer(key, _NA)
        self._filter = lambda t: indexer(self.traversal._get_element(t)) in self.valueOptions

flatmap_steps

io_step

map_steps

Fold

Bases: MapStep

Source code in mogwai/core/steps/map_steps.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class Fold(MapStep):
    def __init__(self, traversal:Traversal, seed:Any=None, foldfunc:Callable[[Any,Any], Any]=None):
        """
        Combine all traversers into a single Value containing a list of all traversers.
        If `seed` and `foldfunc` are provided, the traversers will be reduced into a single value using the provided function.
        For example, `g.V().values('age').fold(0, lambda x,y: x+y).next()` will result in the sum of all ages.

        Parameters
        -----------
        seed: Any, optional
            The initial value to start the fold with.
        foldfunc: Callable[[Any,Any], Any], optional
            The bi-function to use to reduce the traversers into a single value.
        """
        super().__init__(traversal)
        self.seed=seed
        self.foldfunc=foldfunc
        if((seed is None) ^ (foldfunc is None)):
            raise QueryError("`seed` and `foldfunc` should be both None or both not None.")

    def __call__(self, traversers:Iterable[Traverser]) -> List[Traverser]:
        if self.seed is not None:
            from functools import reduce
            #if the traversers are values, we need to extract the values first
            def _map(t:Traverser|TravValue|Property):
                if isinstance(t, TravValue):
                    return t.val
                elif isinstance(t, Property):
                    return t.to_dict()
                else:
                    raise GraphTraversalError("Cannot reduce fold of non-value/property traversers.")
            traversers = list(map(_map, traversers))
            return [TravValue(reduce(self.foldfunc, traversers, self.seed))]
        else:
            def _map(t:Traverser|TravValue|Property):
                if isinstance(t, TravValue):
                    return t.val
                elif isinstance(t, Property):
                    return t.to_dict()
                else:
                    return t.get
        return [TravValue(list(map(_map, traversers)))]

    def print_query(self) -> str:
        base = super().print_query()
        if self.seed:
            return f"{base}({self.seed}, {self.foldfunc})"
        else:
            return base
__init__(traversal, seed=None, foldfunc=None)

Combine all traversers into a single Value containing a list of all traversers. If seed and foldfunc are provided, the traversers will be reduced into a single value using the provided function. For example, g.V().values('age').fold(0, lambda x,y: x+y).next() will result in the sum of all ages.

Parameters

seed: Any, optional The initial value to start the fold with. foldfunc: Callable[[Any,Any], Any], optional The bi-function to use to reduce the traversers into a single value.

Source code in mogwai/core/steps/map_steps.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def __init__(self, traversal:Traversal, seed:Any=None, foldfunc:Callable[[Any,Any], Any]=None):
    """
    Combine all traversers into a single Value containing a list of all traversers.
    If `seed` and `foldfunc` are provided, the traversers will be reduced into a single value using the provided function.
    For example, `g.V().values('age').fold(0, lambda x,y: x+y).next()` will result in the sum of all ages.

    Parameters
    -----------
    seed: Any, optional
        The initial value to start the fold with.
    foldfunc: Callable[[Any,Any], Any], optional
        The bi-function to use to reduce the traversers into a single value.
    """
    super().__init__(traversal)
    self.seed=seed
    self.foldfunc=foldfunc
    if((seed is None) ^ (foldfunc is None)):
        raise QueryError("`seed` and `foldfunc` should be both None or both not None.")

modulation_steps

start_steps

statics

add_camel_case_aliases(module_globals)

Add camelCase aliases for all snake_case callables in the module's globals.

Source code in mogwai/core/steps/statics.py
13
14
15
16
17
18
19
20
21
22
23
24
def add_camel_case_aliases(module_globals):
    """Add camelCase aliases for all snake_case callables in the module's globals."""
    camel_case_aliases = {}
    for name, obj in module_globals.items():
        if callable(obj) and '_' in name:  # Only convert callable objects with underscores
            components = name.split('_')
            camel_case_name = components[0] + ''.join(x.capitalize() for x in components[1:])
            if name.endswith('_'):
                camel_case_name += '_'
            if camel_case_name != name:
                camel_case_aliases[camel_case_name] = obj
    module_globals.update(camel_case_aliases)

terminal_steps

traversal

AnonymousTraversal

Bases: Traversal

This class implements Anonymous traversals. These are traversals that are not directly bound to a source. They are used as subqueries in other traversals, and are not meant to be run on their own. As input, they receive a set of traversers from the parent traversal, and they return a set of traversers to the parent traversal.

Importantly, some steps require information from the source traversal (like the graph's configuration) to be able to construct themselves. Therefore, we cannot construct an anonymous traversal at the time it is created, but we need to build it when it is added to a parent traversal. This inheritance structure is a bit of a hack to allow for this behavior and will inherently cause some issues with type hinting.

This behavior is implemented in the following way. 1. When an anonymous traversal is created, it is empty, and it has a list of step templates. 2. When a step method is retrieved from the anonymous traversal, this __getattribute__ call to obtain the method is intercepted. Instead of returning the actual step method, which would run immediately and construct the step, a deferred step method is returned. This deferred step method stores the step method and its arguments in the step templates. 3. When the anonymous traversal is build by a parent traversal, the parent traversal constructs the anonymous traversal. In the anonymous traversal's build method, it constructs all the steps from the step templates as if the step methods are only called at this point. 4. The anonymous traversal is now ready to be run.

Source code in mogwai/core/traversal.py
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
class AnonymousTraversal(Traversal):
    """
    This class implements Anonymous traversals. These are traversals that are not directly bound to a source.
    They are used as subqueries in other traversals, and are not meant to be run on their own.
    As input, they receive a set of traversers from the parent traversal, and they return a set of traversers to the parent traversal.

    Importantly, some steps require information from the source traversal (like the graph's configuration) to be able to construct themselves.
    Therefore, we cannot construct an anonymous traversal at the time it is created, but we need to build it when it is added to a parent traversal.
    This inheritance structure is a bit of a hack to allow for this behavior and will inherently cause some issues with type hinting.

    This behavior is implemented in the following way.
    1. When an anonymous traversal is created, it is empty, and it has a list of step templates.
    2. When a step method is retrieved from the anonymous traversal, this `__getattribute__` call to obtain the method
        is intercepted. Instead of returning the actual step method, which would run immediately and construct the step,
        a deferred step method is returned. This deferred step method stores the step method and its arguments in the step templates.
    3. When the anonymous traversal is build by a parent traversal, the parent traversal constructs the anonymous traversal.
        In the anonymous traversal's build method, it constructs all the steps from the step templates as if the step methods are only called at this point.
    4. The anonymous traversal is now ready to be run.
    """
    def __init__(self, initial_deferred_step:Tuple[Callable[..., Step], Tuple, Dict]):
        self.query_steps:list[Step] = None #we want to make sure that an error is raised if this is accessed before the anonymous traversal is build.
        self.graph = None
        self.terminated = False
        self._needs_path = False
        self._initial_step = initial_deferred_step
        self._step_templates = []

    # we need this since anonymous traversals need to check this before they're run.
    # this is a very tricky part, since `query_steps` is undefined until the anonymous traversal is build.
    @property
    def needs_path(self):
        return self._needs_path or any((s.needs_path for s in self.query_steps))

    @needs_path.setter
    def needs_path(self, value):
        self._needs_path = value

    def number_of_steps(self, recursive = False):
        if self.query_steps is None:
            if recursive:
                logger.warning("Anonymous traversal has not been built yet, cannot count steps.")
            return 1+len(self._step_templates)
        else:
            return super().number_of_steps(recursive)

    def run(self):
        raise ValueError("Cannot run anonymous traversals")   

    def _build(self, traversal: Traversal):
        # first, set the necessary fields
        self.graph = traversal.graph
        self.eager = traversal.eager
        self.use_mp = traversal.use_mp
        self.verify_query = traversal.verify_query
        self.optimize = traversal.optimize
        # then, build the steps
        self.query_steps = []
        init_step_func, init_args, init_kwargs = self._initial_step
        init_step = init_step_func(*init_args, **init_kwargs)
        init_step.traversal = self
        self.query_steps.append(init_step)
        for step, args, kwargs in self._step_templates:
            step(*args, **kwargs) #this is the step function
        for step in self.query_steps:
            step.build()
        self.needs_path = any([s.needs_path for s in self.query_steps])
        if traversal.optimize:
            self._optimize_query()
        if self.verify_query:
            self._verify_query()
        if self.query_steps[0].isstart:
            self.query_steps[0].set_traversal(self)
        super()._build()

    def __call__(self, traversers: Iterable["Traverser"]) -> Iterable["Traverser"]:
        # if this traversal is empty, just reflect back the incoming traversers
        if len(self.query_steps) == 0:
            return traversers
        self.traversers = traversers
        if self.eager:
            try:
                for step in self.query_steps:
                    logger.debug("Running step in anonymous traversal:" + str(step))
                    self.traversers = step(self.traversers)
                    if not type(self.traversers) is list:
                        self.traversers = list(self.traversers)
            except Exception as e:
                raise GraphTraversalError(
                    f"Something went wrong in step {step.print_query()}"
                )
        else:
            for step in self.query_steps:
                logger.debug("Running step:" + str(step))
                self.traversers = step(self.traversers)
            # TODO: Try to do some fancy error handling
        return self.traversers

    def __getattribute__(self, name):
        attr = super().__getattribute__(name)
        if callable(attr) and getattr(attr, "_is_step_method", False):
            if getattr(attr, "_anonymous", True):
                logger.debug("Returning lambda for anonymous step " + attr.__name__)
                def deferred_step(*args, **kwargs):
                    self._step_templates.append((attr, args, kwargs))
                    return self
                return deferred_step
            else:
                raise QueryError(f"Step {name} is not allowed in anonymous traversals")
        return attr

    def print_query(self):
        def format_args(step, args, kwargs):
            step_str = f"{step.__name__}"
            args_str, kwarg_str = None, None
            if len(args) > 0:
                args_str = ", ".join(str(x) for x in args)
            if len(kwargs) > 0:
                kwarg_str = ", ".join(f"{key}={val}" for key, val in kwargs.items())
            if args_str and kwarg_str:
                step_str += f"({args_str}, {kwarg_str})"
            elif args_str:
                step_str += f"({args_str})"
            elif kwarg_str:
                step_str += f"({kwarg_str})"
            return step_str
        texts = [format_args(*self._initial_step)]
        texts += [format_args(step, args, kwargs) for step, args, kwargs in self._step_templates]
        return " -> ".join(texts)

MogwaiGraphTraversalSource

see https://tinkerpop.apache.org/javadocs/current/full/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.html

A GraphTraversalSource is the primary DSL of the Gremlin traversal machine. It provides access to all the configurations and steps for Turing complete graph computing. Any DSL can be constructed based on the methods of both GraphTraversalSource and GraphTraversal.

Source code in mogwai/core/traversal.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
class MogwaiGraphTraversalSource:
    """
    see https://tinkerpop.apache.org/javadocs/current/full/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.html

    A GraphTraversalSource is the primary DSL of the Gremlin traversal machine.
    It provides access to all the configurations and steps
    for Turing complete graph computing.
    Any DSL can be constructed based on the methods of both GraphTraversalSource
    and GraphTraversal.

    """

    def __init__(
        self,
        connector: MogwaiGraph,
        eager: bool = False,
        optimize: bool = True,
        use_mp: bool = USE_MULTIPROCESSING,
    ):
        self.connector = connector
        self.traversal_args = dict(
            optimize=optimize, eager=eager, query_verify=True, use_mp=use_mp
        )

    def E(self, *init:Tuple[str]|List[Tuple[str]]) -> 'Traversal':
        from .steps.start_steps import E
        if len(init) == 0: init = None
        elif len(init) == 1: init = init[0]
        return Traversal(self, start=E(self.connector, init), **self.traversal_args)

    def V(self, *init:str) -> 'Traversal':
        from .steps.start_steps import V
        if len(init) == 0: init = None
        elif len(init) == 1: init = init[0]
        return Traversal(self, start=V(self.connector, init), **self.traversal_args)

    def addE(
        self, relation: str, from_: str = None, to_: str = None, **kwargs
    ) -> "Traversal":
        from .steps.start_steps import AddE

        return Traversal(
            self,
            start=AddE(self.connector, relation, from_=from_, to_=to_, **kwargs),
            **self.traversal_args,
        )

    def addV(self, label: str | Set[str], name: str = "", **kwargs) -> "Traversal":
        from .steps.start_steps import AddV

        return Traversal(
            self,
            start=AddV(self.connector, label, name, **kwargs),
            **self.traversal_args,
        )

Traversal

see https://tinkerpop.apache.org/javadocs/3.7.3/core/org/apache/tinkerpop/gremlin/process/traversal/Traversal.html

This class represents the base class for all traversals. Each traversal is a directed walk over a graph, executed using an iterator-based traversal. You shouldn't create instances of this class directly, but instead use a Traversal Source, (e.g. the MogwaiGraphTraversalSource) to create a new traversal. Then, you can chain traversal steps to create a query that will be executed when you call the run() method.

Source code in mogwai/core/traversal.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
@add_camel_case_methods
class Traversal:
    """
    see https://tinkerpop.apache.org/javadocs/3.7.3/core/org/apache/tinkerpop/gremlin/process/traversal/Traversal.html

    This class represents the base class for all traversals. Each traversal is a directed walk over a graph, executed
    using an iterator-based traversal.
    You shouldn't create instances of this class directly, but instead use a Traversal Source, (e.g. the `MogwaiGraphTraversalSource`)
    to create a new traversal.
    Then, you can chain traversal steps to create a query that will be executed when you call the `run()` method.
    """

    #the following comment is misleading as it does not apply to this specific Traversal class
    """
    see https://tinkerpop.apache.org/javadocs/3.7.3/core/org/apache/tinkerpop/gremlin/process/traversal/Traversal.html

    A Traversal represents a directed walk over a Graph.
    This is the base interface for all traversals,
    where each extending interface is seen as a domain
    specific language. For example, GraphTraversal
    is a domain specific language for traversing a graph
    using "graph concepts" (e.g. vertices, edges).

    A Traversal is evaluated in one of two ways:
    iterator-based OLTP or GraphComputer-based OLAP.
    OLTP traversals leverage an iterator and are executed
    within a single execution environment (e.g. JVM)
    (with data access allowed to be remote).

    OLAP traversals leverage GraphComputer and are executed
    between multiple execution environments (e.g.JVMs) (and/or cores).
    """
    __steps__ = set()

    def __init__(
        self,
        source: "MogwaiGraphTraversalSource",
        start: "Step",
        optimize: bool = True,
        eager: bool = False,
        query_verify: bool = False,
        use_mp: bool = False,
    ):
        if start is None:
            raise QueryError("start step cannot be None")
        self.query_steps = [start]
        if not self.query_steps[0].isstart:
            raise QueryError(
                "The first step should be a start-step, got " + str(self.query_steps[0])
            )
        self.graph = source.connector
        self.terminated = False
        self.eager = eager
        self.use_mp = use_mp
        self.verify_query = query_verify
        self.optimize = optimize
        self.max_iteration_depth = DEFAULT_ITERATION_DEPTH

    def number_of_steps(self, recursive:bool=False) -> int:
        if recursive:
            return sum([step.number_of_steps(recursive=True) for step in self.query_steps])
        return len(self.query_steps)

    def _add_step(self, step: "Step"):
        if self.terminated:
            raise QueryError("Cannot add steps to a terminated traversal.")
        self.query_steps.append(step)
        if step.isterminal:
            self.terminated = True

    ## ===== FILTER STEPS ======
    @step_method()
    def filter_(self, condition: "AnonymousTraversal") -> "Traversal":
        from .steps.filter_steps import Filter
        self._add_step(Filter(self, condition))
        return self

    @step_method()
    def has(self, *args) -> "Traversal":
        """
        Filter traversers based on whether they have the given properties.
        * If one argument is given, it is assumed to be a key, and the step checks if a property with that key exists, regardless of its value.
        * If two arguments are given, it is assumed to be a key and a value, and the step checks if a property with that key exists and has the given value.
        * If three arguments are given, the first argument is assumed to be a label, and the step checks if a property with the given key and value exists on an element with that label.
        """
        # if `key` is a list, like ['a', 'b'], the value will be compared to data['a']['b']
        from .steps.filter_steps import Has

        if len(args) == 1:
            key, value = args[0], None
            self._add_step(Has(self, key, value))
        elif len(args) == 2:
            key, value = args
            self._add_step(Has(self, key, value))
        elif len(args) == 3:
            label, key, value = args
            self._add_step(Has(self, key, value, label=label))
        else:
            raise QueryError("Invalid number of arguments for `has`")
        return self

    @step_method()
    def has_not(self, key: str):
        from .steps.filter_steps import HasNot

        self._add_step(HasNot(self, key))
        return self

    @step_method()
    def has_key(self, *keys: str):
        from .steps.filter_steps import HasKey

        self._add_step(HasKey(self, *keys))
        return self

    @step_method()
    def has_value(self, *values: Any) -> "Traversal":
        from .steps.filter_steps import HasValue

        self._add_step(HasValue(self, *values))
        return self

    @step_method()
    def has_id(self, *ids: str | tuple) -> "Traversal":
        from .steps.filter_steps import HasId

        self._add_step(HasId(self, *ids))
        return self

    @step_method()
    def has_name(self, *name: str) -> "Traversal":
        if len(name) == 0:
            raise QueryError("No name provided for `has_name`")
        elif len(name) == 1:
            return self.has("name", name[0])
        elif len(name) > 1:
            from .steps.filter_steps import HasWithin

            self._add_step(HasWithin(self, "name", name))
            return self

    @step_method()
    def has_label(self, label: str | Set[str]) -> "Traversal":
        if isinstance(label, set):
            from .steps.filter_steps import ContainsAll

            self._add_step(ContainsAll(self, "labels", label))
        else:
            from .steps.filter_steps import Contains

            self._add_step(Contains(self, "labels", label))
        return self

    @step_method()
    def is_(self, condition: Any) -> "Traversal":
        from .steps.filter_steps import Is

        self._add_step(Is(self, condition))
        return self

    @step_method()
    def contains(self, key: str | List[str], value: Any) -> "Traversal":
        if isinstance(value, list):
            from .steps.filter_steps import ContainsAll

            self._add_step(ContainsAll(self, key, value))
        else:
            from .steps.filter_steps import Contains

            self._add_step(Contains(self, key, value))
        return self

    @step_method()
    def within(self, key: str | List[str], options: List[Any]) -> "Traversal":
        from .steps.filter_steps import Within

        self._add_step(Within(self, key, options))
        return self

    @step_method()
    def simple_path(self, by: str | List[str] = None) -> "Traversal":
        from .steps.filter_steps import SimplePath

        self._add_step(SimplePath(self, by=by))
        return self

    @step_method()
    def limit(self, n: int) -> "Traversal":
        from .steps.filter_steps import Range

        self._add_step(Range(self, 0, n))
        return self

    @step_method()
    def range(self, start: int, end: int) -> "Traversal":
        from .steps.filter_steps import Range

        self._add_step(Range(self, start, end))
        return self

    @step_method()
    def skip(self, n: int) -> "Traversal":
        from .steps.filter_steps import Range

        self._add_step(Range(self, n, -1))
        return self

    @step_method()
    def dedup(self, by: str | List[str] = None) -> "Traversal":
        from .steps.filter_steps import Dedup

        self._add_step(Dedup(self, by=by))
        return self

    @step_method()
    def not_(self, condition: "AnonymousTraversal") -> "Traversal":
        from .steps.filter_steps import Not

        self._add_step(Not(self, condition))
        return self

    @step_method()
    def and_(self, A: "AnonymousTraversal", B: "AnonymousTraversal") -> "Traversal":
        from .steps.filter_steps import And

        self._add_step(And(self, A, B))
        return self

    @step_method()
    def or_(self, A: "AnonymousTraversal", B: "AnonymousTraversal") -> "Traversal":
        from .steps.filter_steps import Or

        self._add_step(Or(self, A, B))
        return self

    ## ===== MAP STEPS ======
    @step_method()
    def identity(self) -> "Traversal":  # required for math reasons
        return self

    @step_method()
    def id_(self) -> 'Traversal':
        from .steps.map_steps import Id
        self._add_step(Id(self))
        return self

    # Important: `value` extract values from *Property's*
    # `values` extracts values from *elements*!
    # So, .properties(key).value() is the same as .values(key)
    @step_method()
    def value(self) -> "Traversal":
        from .steps.map_steps import Value

        self._add_step(Value(self))
        return self

    @step_method()
    def key(self) -> "Traversal":
        from .steps.map_steps import Key

        self._add_step(Key(self))
        return self

    @step_method()
    def values(self, *keys: str | List[str]) -> "Traversal":
        from .steps.map_steps import Values

        self._add_step(Values(self, *keys))
        return self

    @step_method()
    def name(self) -> "Traversal":
        return self.values("name")

    @step_method()
    def label(self) -> "Traversal":
        return self.values("labels")

    @step_method()
    def properties(self, *keys: str | List[str]) -> "Traversal":
        from .steps.map_steps import Properties

        self._add_step(Properties(self, *keys))
        return self

    @step_method()
    def select(self, *args: str, by: str = None) -> "Traversal":
        from .steps.map_steps import Select

        self._add_step(
            Select(self, keys=args[0] if len(args) == 1 else list(args), by=by)
        )
        return self

    @step_method()
    def order(
        self,
        by: str | List[str] | EnumOrder | "AnonymousTraversal" = None,
        asc: bool | None = None,
        **kwargs,
    ) -> "Traversal":
        from .steps.map_steps import Order
        if isinstance(by, EnumOrder):
            if asc is not None:
                raise QueryError("Cannot provide `asc` argument when `by` is an Order")
            self._add_step(Order(self, order=by, **kwargs))
        else:
            self._add_step(Order(self, by, asc=asc, **kwargs))
        return self

    @step_method()
    def fold(self, seed:Any=None, foldfunc:Callable[[Any,Any],Any]=None):
        from .steps.map_steps import Fold
        self._add_step(Fold(self, seed, foldfunc))
        return self

    @step_method()
    def count(self, scope: Scope = Scope.global_) -> "Traversal":
        from .steps.map_steps import Count

        self._add_step(Count(self, scope))
        return self

    @step_method()
    def path(self, by: str | List[str] = None) -> "Traversal":
        from .steps.map_steps import Path

        self._add_step(Path(self, by=by))
        return self

    @step_method()
    def max_(self, scope: Scope = Scope.global_) -> "Traversal":
        from .steps.map_steps import Max

        self._add_step(Max(self, scope))
        return self

    @step_method()
    def min_(self, scope: Scope = Scope.global_) -> "Traversal":
        from .steps.map_steps import Min

        self._add_step(Min(self, scope))
        return self

    @step_method()
    def sum_(self, scope: Scope = Scope.global_) -> "Traversal":
        from .steps.map_steps import Aggregate

        self._add_step(Aggregate(self, "sum", scope))
        return self

    @step_method()
    def mean(self, scope: Scope = Scope.global_) -> "Traversal":
        from .steps.map_steps import Aggregate

        self._add_step(Aggregate(self, "mean", scope))
        return self

    @step_method()
    def element_map(self, *keys: str) -> "Traversal":
        from .steps.map_steps import ElementMap

        if len(keys) == 1:
            keys = keys[0]
        elif len(keys) == 0:
            keys = None
        self._add_step(ElementMap(self, keys))
        return self

    ## ===== FLATMAP STEPS ======
    @step_method()
    def out(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import Out

        self._add_step(Out(self, direction))
        return self

    @step_method()
    def outE(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import OutE

        self._add_step(OutE(self, direction))
        return self

    @step_method()
    def outV(self) -> "Traversal":
        from .steps.flatmap_steps import OutV

        self._add_step(OutV(self))
        return self

    @step_method()
    def in_(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import In

        self._add_step(In(self, direction))
        return self

    @step_method()
    def inE(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import InE

        self._add_step(InE(self, direction))
        return self

    @step_method()
    def inV(self) -> "Traversal":
        from .steps.flatmap_steps import InV

        self._add_step(InV(self))
        return self

    @step_method()
    def both(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import Both

        self._add_step(Both(self, direction))
        return self

    @step_method()
    def bothE(self, direction: str = None) -> "Traversal":
        from .steps.flatmap_steps import BothE

        self._add_step(BothE(self, direction))
        return self

    @step_method()
    def bothV(self) -> "Traversal":
        from .steps.flatmap_steps import BothV

        self._add_step(BothV(self))
        return self

    ## ===== BRANCH STEPS =====
    @step_method()
    @with_call_order
    def repeat(
        self,
        do: "Traversal",
        times: int | None = None,
        until: "AnonymousTraversal|None" = None,
        **kwargs,
    ) -> "Traversal":
        from .steps.branch_steps import Repeat
        from .steps.modulation_steps import Temp

        if until is not None:
            until_do = (
                len(kwargs.get("_order", [])) > 0 and kwargs["_order"][0] == "until"
            )
        else:
            until_do = None

        step = Repeat(self, do, times=times, until=until, until_do=until_do)
        while isinstance((prevstep := self.query_steps[-1]), Temp):
            if prevstep.kwargs["type"] == "emit":
                step.emit = prevstep.kwargs["filter"]
                step.emit_before = True
            elif prevstep.kwargs["type"] == "until":
                if until is not None or times is not None:
                    raise QueryError(
                        "Provided `until` to repeat when `times` or `until` was already set."
                    )
                step.until = prevstep.kwargs["cond"]
                step.until_do = True
            else:
                break
            self.query_steps.pop(-1)  # remove the temporary step
        self._add_step(step)
        return self

    @step_method()
    def local(self, localTrav: "AnonymousTraversal") -> "Traversal":
        from .steps.branch_steps import Local

        self._add_step(Local(self, localTrav))
        return self

    @step_method()
    def branch(self, branchFunc: "AnonymousTraversal") -> "Traversal":
        from .steps.branch_steps import Branch
        if branchFunc.number_of_steps() == 0:
            raise QueryError("No steps provided for branch function")
        self._add_step(Branch(self, branchFunc))
        return self

    @step_method()
    def union(self, *traversals: "AnonymousTraversal") -> "Traversal":
        from .steps.branch_steps import Union

        self._add_step(Union(self, *traversals))
        return self

    ## ===== SIDE EFFECT STEPS ======
    @step_method()
    def side_effect(
        self, side_effect: "AnonymousTraversal|Callable[[Traverser], None]"
    ) -> "Traversal":
        from .steps.base_steps import SideEffectStep

        self._add_step(SideEffectStep(self, side_effect))
        return self

    @step_method()
    def property(self, *args) -> 'Traversal':
        from .steps.base_steps import SideEffectStep
        if len(args)==2:
            key, value = args
            if key==Cardinality.label:
                key = "labels"
                value = set(value) if isinstance(value, (list,tuple, set, dict)) else {value}
        elif len(args)==3:
            cardinality, key, value = args
            match cardinality:
                case Cardinality.set_: value = set(value) if isinstance(value, (list,tuple, set, dict)) else {value}
                case Cardinality.list_: value = list(value) if isinstance(value, (list,tuple, set, dict)) else [value]
                case Cardinality.map_ | Cardinality.dict_: pass #we keep the value as a value
                case _:
                    raise ValueError("Invalid cardinality for property, expected `set`, `list`, `map` or `dict`")
        else:
            raise ValueError("Invalid number of arguments for `property`, expected signature (cardinality, key, value) or (key, value)")
        if isinstance(key, (tuple, list)):
            indexer = tu.get_dict_indexer(key[:-1])
            key = key[-1]
        else:
            indexer = lambda x: x

        def effect(t: "Traverser"):
            indexer(self._get_element(t))[key] = value

        self._add_step(SideEffectStep(self, side_effect=effect))
        return self

     ## ===== IO =====
    @step_method(not_anonymous=True)
    def io(self, file_path: str, read: bool = None, write: bool = None) -> "Traversal":
        from .steps.io_step import IO
        if read is not None and write is not None:
            if not read ^ write:
                raise QueryError("read and write cannot be both true or both false")
        self._add_step(IO(self, file_path, read=read, write=write))
        return self

    ## ===== MODULATION STEPS ======
    @step_method()
    def option(self, branchKey, option_trav: "AnonymousTraversal") -> "Traversal":
        from .steps.branch_steps import Branch

        branchStep = self.query_steps[len(self.query_steps) - 1]
        if type(branchStep) is Branch:
            if branchKey is not None:
                branchStep.add_option(branchKey, option_trav)
            else:
                branchStep.set_default(option_trav)
            return self
        else:
            raise QueryError("Option can only be used after Branch step")

    @step_method()
    def until(self, cond: "AnonymousTraversal"):
        from .steps.branch_steps import Repeat

        prevstep = self.query_steps[-1]
        if isinstance(prevstep, Repeat):
            if prevstep.until is None and prevstep.times is None:
                prevstep.until = cond
                prevstep.until_do = False
            else:
                raise QueryError(
                    "Provided `until` to repeat when `times` or `until` was already set."
                )
        else:
            from .steps.modulation_steps import Temp

            self._add_step(Temp(self, type="until", cond=cond))
        return self

    @step_method()
    def times(self, reps: int):
        from .steps.branch_steps import Repeat

        prevstep = self.query_steps[-1]
        if isinstance(prevstep, Repeat):
            if prevstep.times is None and prevstep.until is None:
                prevstep.times = reps
            else:
                raise QueryError(
                    "Provided `times` to repeat when `times` or `until` was already set."
                )
        else:
            raise QueryError(
                f"`times` modulation is not supported by step {prevstep.print_query()}"
            )
        return self

    @step_method()
    def emit(self, filter: "AnonymousTraversal|None" = None):
        from .steps.branch_steps import Repeat

        prevstep = self.query_steps[-1]
        if isinstance(prevstep, Repeat):
            if prevstep.emit is None:
                prevstep.emit = filter or True
                prevstep.emit_before = False
            else:
                raise QueryError(
                    "Provided `emit` to repeat when `emit` was already set."
                )
        else:
            from .steps.modulation_steps import Temp

            self._add_step(Temp(self, type="emit", filter=filter or True))
        return self

    @step_method()
    def as_(self, name: str) -> "Traversal":
        from .steps.modulation_steps import As

        self._add_step(As(self, name))
        return self

    @step_method()
    def by(self, key: str | List[str] | "AnonymousTraversal", *args) -> "Traversal":
        prev_step = self.query_steps[-1]
        if prev_step.supports_by:
            if isinstance(key, AnonymousTraversal):
                if not prev_step.supports_anonymous_by:
                    raise QueryError(
                        f"Step `{prev_step.print_query()}` does not support anonymous traversals as by-modulations."
                    )
            elif type(key) is not str:
                if isinstance(key, EnumOrder) and prev_step.__class__.__name__=="Order":
                    pass
                else:
                    raise QueryError("Invalid key type for by-modulation")

            if prev_step.supports_multiple_by:
                prev_step.by.append(key)
            elif prev_step.by is None:
                prev_step.by = key if len(args)==0 else (key, *args)
            else:
                raise QueryError(
                    f"Step `{prev_step.print_query()}` does not support multiple by-modulations."
                )
        else:
            raise QueryError(
                f"Step `{prev_step.print_query()}` does not support by-modulation."
            )
        return self

    @step_method()
    def from_(self, src: str) -> "Traversal":
        prev_step = self.query_steps[-1]
        if prev_step.supports_fromto:
            if type(src) is not str:
                raise QueryError(f"Invalid source type `{type(src)}` for from-modulation: str needed!")
            if prev_step.from_ is None:
                prev_step.from_ = src
            else:
                raise QueryError(
                    f"Step `{prev_step.print_query()}` does not support multiple from-modulations."
                )
        else:
            raise QueryError(
                f"Step `{prev_step.print_query()}` does not support from-modulation."
            )
        return self

    @step_method()
    def to_(self, dest: str) -> "Traversal":
        prev_step = self.query_steps[-1]
        if prev_step.supports_fromto:
            if type(dest) is not str:
                raise QueryError("Invalid source type for to-modulation: str needed!")
            if prev_step.to_ is None:
                prev_step.to_ = dest
            else:
                raise QueryError(
                    f"Step `{prev_step.print_query()}` does not support multiple to-modulations."
                )
        else:
            raise QueryError(
                f"Step `{prev_step.print_query()}` does not support to-modulation."
            )
        return self

    @step_method()
    def with_(self, *args) -> "Traversal":
        prev_step = self.query_steps[-1]
        if prev_step.flags & Step.SUPPORTS_WITH:
            if prev_step.with_ is None:
                prev_step.with_ = args
            else:
                raise QueryError(
                    f"Step `{prev_step.print_query()}` does not support multiple with-modulations."
                )
        else:
            raise QueryError(
                f"Step `{prev_step.print_query()}` does not support with-modulation."
            )
        return self

    @step_method(not_anonymous=True)
    def read(self) ->  "Traversal":
        from .steps.io_step import IO
        prev_step = self.query_steps[-1]
        if isinstance(prev_step, IO):
            prev_step.read = True
        else:
            raise QueryError(f"the read() step can only be used after an IO step.")
        return self

    @step_method(not_anonymous=True)
    def write(self) ->  "Traversal":
        from .steps.io_step import IO
        prev_step = self.query_steps[-1]
        if isinstance(prev_step, IO):
            prev_step.write = True
        else:
            raise QueryError(f"the write() step can only be used after an IO step.")
        return self

    ## ===== TERMINAL STEPS ======
    @step_method(not_anonymous=True)
    def to_list(
        self, by: List[str] | str = None, include_data: bool = False
    ) -> "Traversal":
        # terminal step
        from .steps.terminal_steps import ToList

        self._add_step(ToList(self, by=by, include_data=include_data))
        return self

    @step_method(not_anonymous=True)
    def as_path(self, by: List[str] | str = None) -> "Traversal":
        # terminal step
        from .steps.terminal_steps import AsPath

        self._add_step(AsPath(self, by=by))
        return self

    @step_method(not_anonymous=True)
    def has_next(self) -> "Traversal":
        from .steps.terminal_steps import HasNext

        self._add_step(HasNext(self))
        return self

    @step_method(not_anonymous=True)
    def next(self, n:int=1) -> 'Traversal':
        from .steps.terminal_steps import Next
        self._add_step(Next(self, amount=n))
        return self

    @step_method(not_anonymous=True)
    def iter(
        self, by: str | List[str] = None, include_data: bool = False
    ) -> "Traversal":
        from .steps.terminal_steps import AsGenerator

        self._add_step(AsGenerator(self, by=by, include_data=include_data))
        return self

    @step_method(not_anonymous=True)
    def iterate(self) -> "Traversal":
        from .steps.terminal_steps import Iterate

        self._add_step(Iterate(self))
        return self

    def _optimize_query(self):
        pass

    def _verify_query(self):
        from .steps.modulation_steps import Temp

        for step in self.query_steps:
            if isinstance(step, Temp):
                raise QueryError(f"Remaining modulation step of type `{step['type']}`")
        return True

    def _build(self):
        for step in self.query_steps:
            step.build()

    def run(self) -> Any:
        # first, provide the start step with this traversal
        self.traversers = []
        self.query_steps[0].set_traversal(self)
        self._build()
        self.needs_path = any([s.needs_path for s in self.query_steps])
        if self.optimize:
            self._optimize_query()
        self._verify_query()
        if self.eager:
            try:
                for step in self.query_steps:
                    logger.debug("Running step:" + str(step))
                    self.traversers = step(self.traversers)
                    if not type(self.traversers) is list and not step.isterminal: # terminal steps could produce any type of output
                        self.traversers = list(self.traversers)
            except Exception as e:
                raise GraphTraversalError(
                    f"Something went wrong in step {step.print_query()}"
                )
        else:
            for step in self.query_steps:
                logger.debug("Running step:" + str(step))
                self.traversers = step(self.traversers)
            # TODO: Try to do some fancy error handling
        return self.traversers

    def _get_element(self, traverser: "Traverser", data: bool = False):
        if type(traverser) == Traverser:
            if data:
                return (
                    self.graph.edges[traverser.get]
                    if traverser.is_edge
                    else self.graph.nodes(data=data)[traverser.get]
                )
            return (
                self.graph.edges[traverser.get]
                if traverser.is_edge
                else self.graph.nodes[traverser.get]
            )
        else:
            raise GraphTraversalError(
                "Cannot get element from value or property traverser."
                + " Probably you are performing a step that can only be executed on graph elements on a value or property traverser."
            )

    def _get_element_from_id(self, element_id: str | tuple):
        if isinstance(element_id, tuple):
            node = self.graph.edges[element_id]
        else:
            node = self.graph.nodes[element_id]
        return node

    def print_query(self) -> str:
        text = " -> ".join([x.print_query() for x in self.query_steps])
        return text

    def __str__(self) -> str:
        return self.print_query()
has(*args)

Filter traversers based on whether they have the given properties. * If one argument is given, it is assumed to be a key, and the step checks if a property with that key exists, regardless of its value. * If two arguments are given, it is assumed to be a key and a value, and the step checks if a property with that key exists and has the given value. * If three arguments are given, the first argument is assumed to be a label, and the step checks if a property with the given key and value exists on an element with that label.

Source code in mogwai/core/traversal.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@step_method()
def has(self, *args) -> "Traversal":
    """
    Filter traversers based on whether they have the given properties.
    * If one argument is given, it is assumed to be a key, and the step checks if a property with that key exists, regardless of its value.
    * If two arguments are given, it is assumed to be a key and a value, and the step checks if a property with that key exists and has the given value.
    * If three arguments are given, the first argument is assumed to be a label, and the step checks if a property with the given key and value exists on an element with that label.
    """
    # if `key` is a list, like ['a', 'b'], the value will be compared to data['a']['b']
    from .steps.filter_steps import Has

    if len(args) == 1:
        key, value = args[0], None
        self._add_step(Has(self, key, value))
    elif len(args) == 2:
        key, value = args
        self._add_step(Has(self, key, value))
    elif len(args) == 3:
        label, key, value = args
        self._add_step(Has(self, key, value, label=label))
    else:
        raise QueryError("Invalid number of arguments for `has`")
    return self

traverser

Created on 2024

@author: Joep Geuskens

Traverser

Bases: BaseTraverser

see https://tinkerpop.apache.org/javadocs/3.7.3/core/org/apache/tinkerpop/gremlin/process/traversal/Traverser.html

A Traverser represents the current state of an object flowing through a Traversal.

A traverser maintains a reference to the current object, a traverser-local "sack", a traversal-global sideEffect, a bulk count, and a path history.

Different types of traversers can exist depending on the semantics of the traversal and the desire for space/time optimizations of the developer.

Source code in mogwai/core/traverser.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class Traverser(BaseTraverser):
    """
    see https://tinkerpop.apache.org/javadocs/3.7.3/core/org/apache/tinkerpop/gremlin/process/traversal/Traverser.html

    A Traverser represents the current state of an object
    flowing through a Traversal.

    A traverser maintains a reference to the current object,
    a traverser-local "sack",
    a traversal-global sideEffect, a bulk count,
    and a path history.

    Different types of traversers can exist
    depending on the semantics of the traversal
    and the desire for space/time optimizations of
    the developer.
    """

    def __init__(
        self, node_id: str, other_node_id: str = None, track_path: bool = False
    ):
        super().__init__(track_path=track_path)
        self.node_id = node_id
        self.track_path = track_path
        self.target = other_node_id
        self.path = [self.get] if track_path else None

    def move_to_edge(self, source: str, target: str) -> None:
        self.node_id = source
        self.target = target
        if self.track_path:
            self.path.append((source, target))

    @property
    def get(self) -> str | tuple:
        return (self.node_id, self.target) if self.is_edge else self.node_id

    @property
    def source(self) -> str:
        return self.node_id

    @property
    def is_edge(self) -> bool:
        return self.target is not None

    def save(self, key):
        if self.is_edge:
            to_store = Traverser(*self.get, track_path=self.track_path)
        else:
            to_store = Traverser(self.get, track_path=self.track_path)

        to_store.cache = self.cache.copy()  # no need to deep copies
        self.cache["__store__"][key] = to_store

    def load(self, key):
        # logger.debug(f"Cache: {self.cache['__store__'].keys()}")
        try:
            return self.cache["__store__"][key]
        except KeyError:
            raise ValueError(
                f"No object `{key}` was saved in this traverser. Use .as('{key}') to save traversal steps."
            )

    def get_cache(self, key):
        return self.cache.get(key, None)

    def set(self, key: str, val: Any):
        assert key != "__store__", "`__store__` is a reserved key"
        self.cache[key] = val

    def move_to(self, node_id: str) -> "Traverser":
        # logging.debug("Moving traverser from", self.get, "to", node_id)
        self.node_id = node_id
        self.target = None
        if self.track_path:
            self.path.append(node_id)
        return self

    def copy(self):
        t = Traverser(
            node_id=self.node_id, other_node_id=self.target, track_path=self.track_path
        )
        t.cache = deepcopy(self.cache)
        t.path = self.path.copy() if self.path else None
        return t

    def copy_to(self, node_id: str, other_node_id: str = None) -> "Traverser":
        t = self.copy()
        if other_node_id:
            t.move_to_edge(node_id, other_node_id)
        else:
            t.move_to(node_id)
        return t

    def to_value(self, val, dtype=None):
        val = Value(val, dtype=dtype)
        val.cache = deepcopy(self.cache)
        if self.track_path:
            val.path = self.path.copy()
        return val

    def to_property(self, key, val, dtype=None):
        p = Property(key, val, dtype=dtype)
        p.cache = deepcopy(self.cache)
        if self.track_path:
            p.path = self.path.copy()
        return p

    def __str__(self):
        return f"<{self.__class__.__name__}[get={self.get}, is_edge={self.is_edge}]>"

decorators

decorators

traversal_step_doc(cls)

Decorator to copy the docstring from the init method of a class.

Source code in mogwai/decorators/decorators.py
40
41
42
43
44
45
def traversal_step_doc(cls):
    """Decorator to copy the docstring from the __init__ method of a class."""
    def decorator(func):
        func.__doc__ = cls.__init__.__doc__
        return func
    return decorator

examples

modern

Created on 2024-11-14

@author: wf

Person

a person

Source code in mogwai/examples/modern.py
12
13
14
15
16
17
18
19
@lod_storable
class Person:
    """
    a person
    """

    name: str
    age: int

Software

a software

Source code in mogwai/examples/modern.py
22
23
24
25
26
27
28
29
@lod_storable
class Software:
    """
    a software
    """

    name: str
    lang: Optional[str] = None

schema

Created on 15.11.2024

@author: wf

MogwaiExampleSchema

the mogwai examples schema

Source code in mogwai/examples/schema.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MogwaiExampleSchema:
    """
    the mogwai examples schema
    """

    @classmethod
    def get_yaml_path(cls) -> str:
        # Get the directory of the current script
        script_dir = os.path.dirname(os.path.abspath(__file__))

        # Set yaml_path to the "mogwai_examples" directory located in the parent directory of the script's location
        yaml_path = os.path.join(
            script_dir, "../..", "mogwai_examples", "modern-schema.yaml"
        )

        # Normalize the path to remove any redundant components
        yaml_path = os.path.normpath(yaml_path)
        return yaml_path

graph_config

Created on 2024-08-17

@author: wf

GraphConfig

Configuration for a graph in the Examples class

Source code in mogwai/graph_config.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@lod_storable
class GraphConfig:
    """
    Configuration for a graph in the Examples class
    """

    name: str
    file_path: Optional[str] = None
    is_default: bool = False
    node_label_key: str = "labelV"
    edge_label_key: str = "labelE"
    node_name_key: Optional[str] = None
    custom_loader: Optional[str] = None  # Changed to str to store function name

    def get_node_name_key(self) -> Callable[[Dict[str, Any]], Any]:
        if self.node_name_key is None:
            return lambda x: x  # Return identity function if no key specified
        elif isinstance(self.node_name_key, str):
            return lambda x: x.pop(self.node_name_key, None)
        else:
            raise ValueError(f"Invalid node_name_key for graph {self.name}")

GraphConfigs

Manages a collection of GraphConfig instances

Source code in mogwai/graph_config.py
36
37
38
39
40
@lod_storable
class GraphConfigs:
    """Manages a collection of GraphConfig instances"""

    configs: Dict[str, GraphConfig] = field(default_factory=dict)

graphs

Created on 2024-08-17

@author: wf

Graphs

Manage MogwaiGraphs

Source code in mogwai/graphs.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Graphs:
    """
    Manage MogwaiGraphs
    """

    def __init__(
        self, config_file: str = None, lazy: bool = False, debug: bool = False
    ):
        self.debug = debug
        self.logger = self.get_logger()
        self.examples_dir = os.path.join(
            os.path.dirname(__file__), "..", "mogwai_examples"
        )
        if config_file is None:
            config_file = os.path.join(self.examples_dir, "example_graph_configs.yaml")
        self.config_file = config_file
        self.graphs: Dict[str, MogwaiGraph] = {}

        self.log(f"Loading configurations from: {self.config_file}")
        self.configs = GraphConfigs.load_from_yaml_file(self.config_file)
        self.log(f"Loaded configurations: {self.configs.configs}")

        if not lazy:
            self.load_examples()

    def get_logger(self):
        return logging.getLogger(self.__class__.__name__)

    def log(self, msg: str):
        if self.debug:
            self.logger.debug(msg)

    def load_examples(self):
        """Load all example graphs based on configurations"""
        self.log("Loading default examples")
        for name, config in self.configs.configs.items():
            if config.is_default:
                self.log(f"Loading default graph: {name}")
                self.get(name)  # This will load the graph using the existing get method

    def _load_graph(self, file_path: str, config: GraphConfig) -> MogwaiGraph:
        """Load a single graph from a .graphml file using the provided configuration"""
        self.log(f"Loading graph from file: {file_path}")
        return graphml_to_mogwaigraph(
            file_path,
            node_label_key=config.node_label_key,
            edge_label_key=config.edge_label_key,
            node_name_key=config.get_node_name_key(),
        )

    def get_names(self) -> List[str]:
        """Get a list of available graph names"""
        names = list(self.configs.configs.keys())
        self.log(f"Available graph names: {names}")
        return names

    def get(self, name: str) -> MogwaiGraph:
        """Get a graph by name, loading it if necessary"""
        if name not in self.configs.configs:
            error_msg = f"Graph '{name}' not found in configurations"
            self.log(error_msg)
            raise ValueError(error_msg)

        if name not in self.graphs:
            config = self.configs.configs[name]
            if config.custom_loader:
                self.log(f"Using custom loader for graph '{name}'")
                # Assuming custom_loader is a string representing a method name in MogwaiGraph
                loader = getattr(MogwaiGraph, config.custom_loader, None)
                if loader and callable(loader):
                    self.graphs[name] = loader()
                else:
                    error_msg = f"Invalid custom loader {config.custom_loader} for graph '{name}'"
                    self.log(error_msg)
                    raise ValueError(error_msg)
            elif config.file_path:
                file_path = os.path.join(self.examples_dir, config.file_path)
                self.log(f"Loading graph '{name}' from file: {file_path}")
                self.graphs[name] = self._load_graph(file_path, config)
            else:
                error_msg = f"No loader or file path specified for graph '{name}'"
                self.log(error_msg)
                raise ValueError(error_msg)

        return self.graphs[name]

get(name)

Get a graph by name, loading it if necessary

Source code in mogwai/graphs.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get(self, name: str) -> MogwaiGraph:
    """Get a graph by name, loading it if necessary"""
    if name not in self.configs.configs:
        error_msg = f"Graph '{name}' not found in configurations"
        self.log(error_msg)
        raise ValueError(error_msg)

    if name not in self.graphs:
        config = self.configs.configs[name]
        if config.custom_loader:
            self.log(f"Using custom loader for graph '{name}'")
            # Assuming custom_loader is a string representing a method name in MogwaiGraph
            loader = getattr(MogwaiGraph, config.custom_loader, None)
            if loader and callable(loader):
                self.graphs[name] = loader()
            else:
                error_msg = f"Invalid custom loader {config.custom_loader} for graph '{name}'"
                self.log(error_msg)
                raise ValueError(error_msg)
        elif config.file_path:
            file_path = os.path.join(self.examples_dir, config.file_path)
            self.log(f"Loading graph '{name}' from file: {file_path}")
            self.graphs[name] = self._load_graph(file_path, config)
        else:
            error_msg = f"No loader or file path specified for graph '{name}'"
            self.log(error_msg)
            raise ValueError(error_msg)

    return self.graphs[name]

get_names()

Get a list of available graph names

Source code in mogwai/graphs.py
66
67
68
69
70
def get_names(self) -> List[str]:
    """Get a list of available graph names"""
    names = list(self.configs.configs.keys())
    self.log(f"Available graph names: {names}")
    return names

load_examples()

Load all example graphs based on configurations

Source code in mogwai/graphs.py
48
49
50
51
52
53
54
def load_examples(self):
    """Load all example graphs based on configurations"""
    self.log("Loading default examples")
    for name, config in self.configs.configs.items():
        if config.is_default:
            self.log(f"Loading default graph: {name}")
            self.get(name)  # This will load the graph using the existing get method

io

mogwai_io

GraphSON

Bases: IOBackend

GraphSON is a JSON-based file format for representing graphs. https://tinkerpop.apache.org/docs/3.7.2/dev/io/#graphson

Source code in mogwai/io/mogwai_io.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class GraphSON(IOBackend):
    """
    GraphSON is a JSON-based file format for representing graphs.
    https://tinkerpop.apache.org/docs/3.7.2/dev/io/#graphson
    """

    def add_node(self, node:dict, graph:MogwaiGraph):
        pass

    def read(self, file_path:str, config:MogwaiGraphConfig=None):
        import json
        config = config or MogwaiGraphConfig()
        g = MogwaiGraph(config=config)
        try:
            logger.info("Reading GraphSON file (assuming wrapped in 'vertices' key)")
            with open(file_path, 'r') as f:
                data = json.load(f)
                if "vertices" in data:
                    for v in data["vertices"]:
                        self.add_node(v, g)
                else:
                    raise IOError(f"Invalid GraphSON file {file_path}: missing 'vertices' key")
        except Exception:
            logger.info("Nope, it's not wrapped in 'vertices' key")
            logger.info("Reading GraphSON file line by line")
            try:
                with open(file_path, 'r') as f:
                    for line in f:
                        data = json.loads(line)
                        self.add_node(data, g)
            except Exception as e:
                logger.error(f"Error reading GraphSON file {file_path}: {e}", exc_info=True)
                raise IOError(f"Error reading GraphSON file {file_path}: {e}")

    def wrap_type(self, val, dtype=None):
        if dtype is None:
            dtype = type(val)
        match dtype:
            case builtins.int:
                if val > 2**31:
                    return {"@type": "g:Int64", "@value": int(val)}
                return {"@type": "g:Int32", "@value": int(val)}
            case builtins.float:
                return {"@type": "g:Double", "@value": float(val)}
            case builtins.bool | builtins.str:
                return val
            case dt.datetime:
                return {"@type": "g:Timestamp", "@value": int(val.timestamp())}
            case dt.date:
                return {"@type": "g:Timestamp", "@value": int(dt.datetime(val.year, val.month, val.day).timestamp())}
            case builtins.list | builtins.tuple:
                return {"@type": "g:List", "@value": [self.wrap_type(v) for v in val]}
            case builtins.set:
                return {"@type": "g:Set", "@value": [self.wrap_type(v) for v in val]}
            case builtins.dict:
                d = []
                for k, v in val.items():
                    d.append(self.wrap_type(k))
                    d.append(self.wrap_type(v))
                return {"@type": "g:Map", "@value": d}
            case _:
                try:
                    import numpy as np
                    if isinstance(val, (np.int16, np.int32)):
                        return {"@type": "g:Int32", "@value": int(val)}
                    if isinstance(val, np.int64):
                        return {"@type": "g:Int64", "@value": int(val)}
                    if isinstance(val, np.float32):
                        return {"@type": "g:Float", "@value": float(val)}
                    if isinstance(val, np.float64):
                        return {"@type": "g:Double", "@value": float(val)}
                    if isinstance(val, np.bool):
                        return bool(val)
                except:
                    pass
                raise ValueError(f"Unsupported type {dtype} for value {str(val)}")

    def unwrap_type(self, val:dict|Any):
        if isinstance(val, (int, float, bool, str)):
            return val
        if isinstance(val, dict):
            dtype = val.get("@type")
            val = val.get("@value")        
        match dtype:
            case "g:Int32" | "g:Int64":
                return int(val)
            case "g:Float" | "g:Double":
                return float(val)
            case "g:Date":
                return dt.date.fromtimestamp(val)
            case "g:Timestamp":
                return dt.datetime.fromtimestamp(val)
            case "g:List":
                return [self.unwrap_type(v) for v in val]
            case "g:Set":
                return {self.unwrap_type(v) for v in val}
            case builtins.dict:
                d = []
                for k, v in val.items():
                    d.append(self.wrap_type(k))
                    d.append(self.wrap_type(v))
                return {"@type": "g:Map", "@value": d}
            case _:
                raise ValueError(f"Unsupported type {dtype} for value {str(val)}")

    def create_list(self, graph):
        l = []
        for node in graph.nodes:
            n = {"id": node.id, "label": node.label}
            for k, v in node.properties.items():
                n[k] = v
            l.append(n)
        return l

    def write(self, file_path:str, graph:MogwaiGraph):
        import json
        l = self.create_list(graph)
        with open(file_path, 'w') as f:
            for node in l:
                f.write(json.dumps(node) + "\n")

lod

yamlable

Created on 2023-12-08, Extended on 2023-16-12 and 2024-01-25

redudant mogwai copy original is at https://github.com/WolfgangFahl/pyLoDStorage/blob/master/lodstorage/yamlable.py

@author: wf, ChatGPT

Prompts for the development and extension of the 'YamlAble' class within the 'yamable' module:

  1. Develop 'YamlAble' class in 'yamable' module. It should convert dataclass instances to/from YAML.
  2. Implement methods for YAML block scalar style and exclude None values in 'YamlAble' class.
  3. Add functionality to remove None values from dataclass instances before YAML conversion.
  4. Ensure 'YamlAble' processes only dataclass instances, with error handling for non-dataclass objects.
  5. Extend 'YamlAble' for JSON serialization and deserialization.
  6. Add methods for saving/loading dataclass instances to/from YAML and JSON files in 'YamlAble'.
  7. Implement loading of dataclass instances from URLs for both YAML and JSON in 'YamlAble'.
  8. Write tests for 'YamlAble' within the pyLodStorage context. Use 'samples 2' example from pyLoDStorage https://github.com/WolfgangFahl/pyLoDStorage/blob/master/lodstorage/sample2.py as a reference.
  9. Ensure tests cover YAML/JSON serialization, deserialization, and file I/O operations, using the sample-based approach..
  10. Use Google-style docstrings, comments, and type hints in 'YamlAble' class and tests.
  11. Adhere to instructions and seek clarification for any uncertainties.
  12. Add @lod_storable annotation support that will automatically YamlAble support and add @dataclass and @dataclass_json prerequisite behavior to a class

DateConvert

date converter

Source code in mogwai/lod/yamlable.py
78
79
80
81
82
83
84
85
86
class DateConvert:
    """
    date converter
    """

    @classmethod
    def iso_date_to_datetime(cls, iso_date: str) -> datetime.date:
        date = datetime.strptime(iso_date, "%Y-%m-%d").date() if iso_date else None
        return date

YamlAble

Bases: Generic[T]

An extended YAML handler class for converting dataclass objects to and from YAML format, and handling loading from and saving to files and URLs.

Source code in mogwai/lod/yamlable.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class YamlAble(Generic[T]):
    """
    An extended YAML handler class for converting dataclass objects to and from YAML format,
    and handling loading from and saving to files and URLs.
    """

    def _yaml_setup(self):
        """
        Initializes the YamAble handler, setting up custom representers and preparing it for various operations.
        """
        if not is_dataclass(self):
            raise ValueError("I must be a dataclass instance.")
        if not hasattr(self, "_yaml_dumper"):
            self._yaml_dumper = yaml.Dumper
            self._yaml_dumper.ignore_aliases = lambda *_args: True
            self._yaml_dumper.add_representer(type(None), self.represent_none)
            self._yaml_dumper.add_representer(str, self.represent_literal)

    def represent_none(self, _, __) -> yaml.Node:
        """
        Custom representer for ignoring None values in the YAML output.
        """
        return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")

    def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
        """
        Custom representer for block scalar style for strings.
        """
        if "\n" in data:
            return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
        return dumper.represent_scalar("tag:yaml.org,2002:str", data)

    def to_yaml(
        self,
        ignore_none: bool = True,
        ignore_underscore: bool = True,
        allow_unicode: bool = True,
        sort_keys: bool = False,
    ) -> str:
        """
        Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
        and using block scalar style for strings.

        Args:
            ignore_none: Flag to indicate whether None values should be removed from the YAML output.
            ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
            allow_unicode: Flag to indicate whether to allow unicode characters in the output.
            sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

        Returns:
            A string representation of the dataclass object in YAML format.
        """
        obj_dict = asdict(self)
        self._yaml_setup()
        clean_dict = self.remove_ignored_values(
            obj_dict, ignore_none, ignore_underscore
        )
        yaml_str = yaml.dump(
            clean_dict,
            Dumper=self._yaml_dumper,
            default_flow_style=False,
            allow_unicode=allow_unicode,
            sort_keys=sort_keys,
        )
        return yaml_str

    @classmethod
    def from_yaml(cls: Type[T], yaml_str: str) -> T:
        """
        Deserializes a YAML string to a dataclass instance.

        Args:
            yaml_str (str): A string containing YAML formatted data.

        Returns:
            T: An instance of the dataclass.
        """
        data: dict[str, Any] = yaml.safe_load(yaml_str)
        instance: T = cls.from_dict(data)
        return instance

    @classmethod
    def load_from_yaml_file(cls: Type[T], filename: str) -> T:
        """
        Loads a dataclass instance from a YAML file.

        Args:
            filename (str): The path to the YAML file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r") as file:
            yaml_str: str = file.read()
        instance: T = cls.from_yaml(yaml_str)
        return instance

    @classmethod
    def load_from_yaml_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a YAML string obtained from a URL.

        Args:
            url (str): The URL pointing to the YAML data.

        Returns:
            T: An instance of the dataclass.
        """
        yaml_str: str = cls.read_from_url(url)
        instance: T = cls.from_yaml(yaml_str)
        return instance

    def save_to_yaml_file(self, filename: str):
        """
        Saves the current dataclass instance to a YAML file.

        Args:
            filename (str): The path where the YAML file will be saved.
        """
        yaml_content: str = self.to_yaml()
        with open(filename, "w") as file:
            file.write(yaml_content)

    @classmethod
    def load_from_json_file(cls: Type[T], filename: str) -> T:
        """
        Loads a dataclass instance from a JSON file.

        Args:
            filename (str): The path to the JSON file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r") as file:
            json_str: str = file.read()
        instance: T = cls.from_json(json_str)
        return instance

    @classmethod
    def load_from_json_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a JSON string obtained from a URL.

        Args:
            url (str): The URL pointing to the JSON data.

        Returns:
            T: An instance of the dataclass.
        """
        json_str: str = cls.read_from_url(url)
        instance: T = cls.from_json(json_str)
        return instance

    def save_to_json_file(self, filename: str, **kwargs):
        """
        Saves the current dataclass instance to a JSON file.

        Args:
            filename (str): The path where the JSON file will be saved.
            **kwargs: Additional keyword arguments for the `to_json` method.
        """
        json_content: str = self.to_json(**kwargs)
        with open(filename, "w") as file:
            file.write(json_content)

    @classmethod
    def read_from_url(cls, url: str) -> str:
        """
        Helper method to fetch content from a URL.
        """
        with urllib.request.urlopen(url) as response:
            if response.status == 200:
                return response.read().decode()
            else:
                raise Exception(f"Unable to load data from URL: {url}")

    @classmethod
    def remove_ignored_values(
        cls,
        value: Any,
        ignore_none: bool = True,
        ignore_underscore: bool = False,
        ignore_empty: bool = True,
    ) -> Any:
        """
        Recursively removes specified types of values from a dictionary or list.
        By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

        Args:
            value: The value to process (dictionary, list, or other).
            ignore_none: Flag to indicate whether None values should be removed.
            ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
            ignore_empty: Flag to indicate whether empty collections should be removed.
        """

        def is_valid(v):
            """Check if the value is valid based on the specified flags."""
            if ignore_none and v is None:
                return False
            if ignore_empty:
                if isinstance(v, Mapping) and not v:
                    return False  # Empty dictionary
                if (
                    isinstance(v, Iterable)
                    and not isinstance(v, (str, bytes))
                    and not v
                ):
                    return (
                        False  # Empty list, set, tuple, etc., but not string or bytes
                    )
            return True

        if isinstance(value, Mapping):
            value = {
                k: YamlAble.remove_ignored_values(
                    v, ignore_none, ignore_underscore, ignore_empty
                )
                for k, v in value.items()
                if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
            }
        elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
            value = [
                YamlAble.remove_ignored_values(
                    v, ignore_none, ignore_underscore, ignore_empty
                )
                for v in value
                if is_valid(v)
            ]
        return value

    @classmethod
    def from_dict2(cls: Type[T], data: dict) -> T:
        """
        Creates an instance of a dataclass from a dictionary, typically used in deserialization.
        """
        if not data:
            return None
        instance = from_dict(data_class=cls, data=data)
        return instance
from_dict2(data) classmethod

Creates an instance of a dataclass from a dictionary, typically used in deserialization.

Source code in mogwai/lod/yamlable.py
320
321
322
323
324
325
326
327
328
@classmethod
def from_dict2(cls: Type[T], data: dict) -> T:
    """
    Creates an instance of a dataclass from a dictionary, typically used in deserialization.
    """
    if not data:
        return None
    instance = from_dict(data_class=cls, data=data)
    return instance
from_yaml(yaml_str) classmethod

Deserializes a YAML string to a dataclass instance.

Parameters:

Name Type Description Default
yaml_str str

A string containing YAML formatted data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in mogwai/lod/yamlable.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_yaml(cls: Type[T], yaml_str: str) -> T:
    """
    Deserializes a YAML string to a dataclass instance.

    Args:
        yaml_str (str): A string containing YAML formatted data.

    Returns:
        T: An instance of the dataclass.
    """
    data: dict[str, Any] = yaml.safe_load(yaml_str)
    instance: T = cls.from_dict(data)
    return instance
load_from_json_file(filename) classmethod

Loads a dataclass instance from a JSON file.

Parameters:

Name Type Description Default
filename str

The path to the JSON file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in mogwai/lod/yamlable.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@classmethod
def load_from_json_file(cls: Type[T], filename: str) -> T:
    """
    Loads a dataclass instance from a JSON file.

    Args:
        filename (str): The path to the JSON file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r") as file:
        json_str: str = file.read()
    instance: T = cls.from_json(json_str)
    return instance
load_from_json_url(url) classmethod

Loads a dataclass instance from a JSON string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the JSON data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in mogwai/lod/yamlable.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@classmethod
def load_from_json_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a JSON string obtained from a URL.

    Args:
        url (str): The URL pointing to the JSON data.

    Returns:
        T: An instance of the dataclass.
    """
    json_str: str = cls.read_from_url(url)
    instance: T = cls.from_json(json_str)
    return instance
load_from_yaml_file(filename) classmethod

Loads a dataclass instance from a YAML file.

Parameters:

Name Type Description Default
filename str

The path to the YAML file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in mogwai/lod/yamlable.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@classmethod
def load_from_yaml_file(cls: Type[T], filename: str) -> T:
    """
    Loads a dataclass instance from a YAML file.

    Args:
        filename (str): The path to the YAML file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r") as file:
        yaml_str: str = file.read()
    instance: T = cls.from_yaml(yaml_str)
    return instance
load_from_yaml_url(url) classmethod

Loads a dataclass instance from a YAML string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the YAML data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in mogwai/lod/yamlable.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@classmethod
def load_from_yaml_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a YAML string obtained from a URL.

    Args:
        url (str): The URL pointing to the YAML data.

    Returns:
        T: An instance of the dataclass.
    """
    yaml_str: str = cls.read_from_url(url)
    instance: T = cls.from_yaml(yaml_str)
    return instance
read_from_url(url) classmethod

Helper method to fetch content from a URL.

Source code in mogwai/lod/yamlable.py
255
256
257
258
259
260
261
262
263
264
@classmethod
def read_from_url(cls, url: str) -> str:
    """
    Helper method to fetch content from a URL.
    """
    with urllib.request.urlopen(url) as response:
        if response.status == 200:
            return response.read().decode()
        else:
            raise Exception(f"Unable to load data from URL: {url}")
remove_ignored_values(value, ignore_none=True, ignore_underscore=False, ignore_empty=True) classmethod

Recursively removes specified types of values from a dictionary or list. By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

Parameters:

Name Type Description Default
value Any

The value to process (dictionary, list, or other).

required
ignore_none bool

Flag to indicate whether None values should be removed.

True
ignore_underscore bool

Flag to indicate whether keys starting with an underscore should be removed.

False
ignore_empty bool

Flag to indicate whether empty collections should be removed.

True
Source code in mogwai/lod/yamlable.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
@classmethod
def remove_ignored_values(
    cls,
    value: Any,
    ignore_none: bool = True,
    ignore_underscore: bool = False,
    ignore_empty: bool = True,
) -> Any:
    """
    Recursively removes specified types of values from a dictionary or list.
    By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

    Args:
        value: The value to process (dictionary, list, or other).
        ignore_none: Flag to indicate whether None values should be removed.
        ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
        ignore_empty: Flag to indicate whether empty collections should be removed.
    """

    def is_valid(v):
        """Check if the value is valid based on the specified flags."""
        if ignore_none and v is None:
            return False
        if ignore_empty:
            if isinstance(v, Mapping) and not v:
                return False  # Empty dictionary
            if (
                isinstance(v, Iterable)
                and not isinstance(v, (str, bytes))
                and not v
            ):
                return (
                    False  # Empty list, set, tuple, etc., but not string or bytes
                )
        return True

    if isinstance(value, Mapping):
        value = {
            k: YamlAble.remove_ignored_values(
                v, ignore_none, ignore_underscore, ignore_empty
            )
            for k, v in value.items()
            if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
        }
    elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
        value = [
            YamlAble.remove_ignored_values(
                v, ignore_none, ignore_underscore, ignore_empty
            )
            for v in value
            if is_valid(v)
        ]
    return value
represent_literal(dumper, data)

Custom representer for block scalar style for strings.

Source code in mogwai/lod/yamlable.py
113
114
115
116
117
118
119
def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
    """
    Custom representer for block scalar style for strings.
    """
    if "\n" in data:
        return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
    return dumper.represent_scalar("tag:yaml.org,2002:str", data)
represent_none(_, __)

Custom representer for ignoring None values in the YAML output.

Source code in mogwai/lod/yamlable.py
107
108
109
110
111
def represent_none(self, _, __) -> yaml.Node:
    """
    Custom representer for ignoring None values in the YAML output.
    """
    return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")
save_to_json_file(filename, **kwargs)

Saves the current dataclass instance to a JSON file.

Parameters:

Name Type Description Default
filename str

The path where the JSON file will be saved.

required
**kwargs

Additional keyword arguments for the to_json method.

{}
Source code in mogwai/lod/yamlable.py
243
244
245
246
247
248
249
250
251
252
253
def save_to_json_file(self, filename: str, **kwargs):
    """
    Saves the current dataclass instance to a JSON file.

    Args:
        filename (str): The path where the JSON file will be saved.
        **kwargs: Additional keyword arguments for the `to_json` method.
    """
    json_content: str = self.to_json(**kwargs)
    with open(filename, "w") as file:
        file.write(json_content)
save_to_yaml_file(filename)

Saves the current dataclass instance to a YAML file.

Parameters:

Name Type Description Default
filename str

The path where the YAML file will be saved.

required
Source code in mogwai/lod/yamlable.py
201
202
203
204
205
206
207
208
209
210
def save_to_yaml_file(self, filename: str):
    """
    Saves the current dataclass instance to a YAML file.

    Args:
        filename (str): The path where the YAML file will be saved.
    """
    yaml_content: str = self.to_yaml()
    with open(filename, "w") as file:
        file.write(yaml_content)
to_yaml(ignore_none=True, ignore_underscore=True, allow_unicode=True, sort_keys=False)

Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables, and using block scalar style for strings.

Parameters:

Name Type Description Default
ignore_none bool

Flag to indicate whether None values should be removed from the YAML output.

True
ignore_underscore bool

Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.

True
allow_unicode bool

Flag to indicate whether to allow unicode characters in the output.

True
sort_keys bool

Flag to indicate whether to sort the dictionary keys in the output.

False

Returns:

Type Description
str

A string representation of the dataclass object in YAML format.

Source code in mogwai/lod/yamlable.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def to_yaml(
    self,
    ignore_none: bool = True,
    ignore_underscore: bool = True,
    allow_unicode: bool = True,
    sort_keys: bool = False,
) -> str:
    """
    Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
    and using block scalar style for strings.

    Args:
        ignore_none: Flag to indicate whether None values should be removed from the YAML output.
        ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
        allow_unicode: Flag to indicate whether to allow unicode characters in the output.
        sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

    Returns:
        A string representation of the dataclass object in YAML format.
    """
    obj_dict = asdict(self)
    self._yaml_setup()
    clean_dict = self.remove_ignored_values(
        obj_dict, ignore_none, ignore_underscore
    )
    yaml_str = yaml.dump(
        clean_dict,
        Dumper=self._yaml_dumper,
        default_flow_style=False,
        allow_unicode=allow_unicode,
        sort_keys=sort_keys,
    )
    return yaml_str

lod_storable(cls)

Decorator to make a class LoDStorable by inheriting from YamlAble. This decorator also ensures the class is a dataclass and has JSON serialization/deserialization capabilities.

Source code in mogwai/lod/yamlable.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def lod_storable(cls):
    """
    Decorator to make a class LoDStorable by
    inheriting from YamlAble.
    This decorator also ensures the class is a
    dataclass and has JSON serialization/deserialization
    capabilities.
    """
    cls = dataclass(cls)  # Apply the @dataclass decorator
    cls = dataclass_json(cls)  # Apply the @dataclass_json decorator

    class LoDStorable(YamlAble, cls):
        """
        decorator class
        """

        __qualname__ = cls.__qualname__
        pass

    LoDStorable.__name__ = cls.__name__
    LoDStorable.__doc__ = cls.__doc__

    return LoDStorable

mogwai_cmd

Created on 2024-08-15

@author: wf

MogwaiCmd

Bases: WebserverCmd

command line handling for nicesprinkler

Source code in mogwai/mogwai_cmd.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MogwaiCmd(WebserverCmd):
    """
    command line handling for nicesprinkler
    """

    def __init__(self):
        """
        constructor
        """
        config = MogwaiWebServer.get_config()
        WebserverCmd.__init__(self, config, MogwaiWebServer, DEBUG)

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)

        return parser

__init__()

constructor

Source code in mogwai/mogwai_cmd.py
20
21
22
23
24
25
def __init__(self):
    """
    constructor
    """
    config = MogwaiWebServer.get_config()
    WebserverCmd.__init__(self, config, MogwaiWebServer, DEBUG)

getArgParser(description, version_msg)

override the default argparser call

Source code in mogwai/mogwai_cmd.py
27
28
29
30
31
32
33
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)

    return parser

main(argv=None)

main call

Source code in mogwai/mogwai_cmd.py
36
37
38
39
40
41
42
def main(argv: list = None):
    """
    main call
    """
    cmd = MogwaiCmd()
    exit_code = cmd.cmd_main(argv)
    return exit_code

parser

excel_converter

filesystem

graphml_converter

graphml_to_mogwaigraph(file, node_label_key, node_name_key, edge_label_key=None, default_node_label='Na', default_edge_label='Na', default_node_name='Na', include_id=False, keep=True)

Converts GraphML file to MogwaiGraph object.

Parameters

file : str Path to the GraphML file. node_label_key : str or Callable[[dict],str] Key to use for the node label. If a string, the value of the key is used as the label. If a function, it should take a dictionary of node data and return a string. node_name_key : str or Callable[[dict],str] Key to use for the node name. If a string, the value of the key is used as the name. If a function, it should take a dictionary of node data and return a string. edge_label_key : str or Callable[[dict],str], optional Key to use for the edge label. If a string, the value of the key is used as the label. If a function, it should take a dictionary of edge data and return a string. If None, the node_label_key is used. default_node_label : str, optional Default label to use for nodes that do not have a property corresponding to node_label_key. default_edge_label : str, optional Default label to use for edges that do not have a property corresponding to edge_label_key. default_node_name : str, optional Default name to use for nodes that do not have a property corresponding to node_name_key. include_id : bool or str, optional If True, the node id is included in the data dictionary of each node. If a string, the node id is included in the data dictionary with the given key. keep : bool, optional If True, the labels and names are kept as properties in the node data dictionary. If False, they are removed.

Returns

MogwaiGraph The graph object

Source code in mogwai/parser/graphml_converter.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def graphml_to_mogwaigraph(
    file: str,
    node_label_key: str | Callable[[dict], str],
    node_name_key: str | Callable[[dict], str],
    edge_label_key: str | Callable[[dict], str] = None,
    default_node_label: str = "Na",
    default_edge_label: str = "Na",
    default_node_name: str = "Na",
    include_id: bool | str = False,
    keep: bool = True,
) -> MogwaiGraph:
    """
    Converts GraphML file to MogwaiGraph object.

    Parameters
    ----------
    file : str
        Path to the GraphML file.
    node_label_key : str or Callable[[dict],str]
        Key to use for the node label. If a string, the value of the key is used as the label.
        If a function, it should take a dictionary of node data and return a string.
    node_name_key : str or Callable[[dict],str]
        Key to use for the node name. If a string, the value of the key is used as the name.
        If a function, it should take a dictionary of node data and return a string.
    edge_label_key : str or Callable[[dict],str], optional
        Key to use for the edge label. If a string, the value of the key is used as the label.
        If a function, it should take a dictionary of edge data and return a string.
        If None, the node_label_key is used.
    default_node_label : str, optional
        Default label to use for nodes that do not have a property corresponding to `node_label_key`.
    default_edge_label : str, optional
        Default label to use for edges that do not have a property corresponding to `edge_label_key`.
    default_node_name : str, optional
        Default name to use for nodes that do not have a property corresponding to `node_name_key`.
    include_id : bool or str, optional
        If True, the node id is included in the data dictionary of each node.
        If a string, the node id is included in the data dictionary with the given key.
    keep : bool, optional
        If True, the labels and names are kept as properties in the node data dictionary. If False, they are removed.

    Returns
    -------
    MogwaiGraph
        The graph object
    """
    gml = nx.read_graphml(file)
    if not gml.is_directed():
        raise MogwaiGraphError("Can not import undirected graphml graph")
    g = MogwaiGraph()
    edge_label_key = edge_label_key or node_label_key
    if include_id == True:
        include_id = "id"  # use 'id' as the default key
    # Note: these function change the node data!
    # However, this is not a problem, since `gml` is discarded anyway.
    missing_label_count = count()
    if type(node_label_key) is str:

        def node_label_func(data: dict):
            if node_label_key in data:
                return data[node_label_key] if keep else data.pop(node_label_key)
            else:
                next(missing_label_count)
                return default_node_label

    else:
        node_label_func = node_label_key
    missing_name_count = count()
    if type(node_name_key) is str:

        def node_name_func(data: dict):
            if node_name_key in data:
                return data[node_name_key] if keep else data.pop(node_name_key)
            else:
                next(missing_name_count)
                return default_node_name

    else:
        node_name_func = node_name_key

    missing_edge_count = count()
    if type(edge_label_key) is str:

        def edge_label_func(data: dict):
            if edge_label_key in data:
                return data[edge_label_key] if keep else data.pop(edge_label_key)
            else:
                next(missing_edge_count)
                return default_edge_label

    else:
        edge_label_func = edge_label_key

    node_to_id_map = {}
    for node, data in gml.nodes(data=True):
        if include_id:
            data[include_id] = node
        assigned_id = g.add_labeled_node(
            label=node_label_func(data), name=node_name_func(data), **data
        )
        node_to_id_map[node] = assigned_id
    for node1, node2, data in gml.edges(data=True):
        g.add_labeled_edge(
            srcId=node_to_id_map[node1],
            destId=node_to_id_map[node2],
            edgeLabel=edge_label_func(data),
            **data,
        )

    missing_edge_count = next(missing_edge_count)
    missing_name_count = next(missing_name_count)
    missing_label_count = next(missing_label_count)
    if missing_edge_count > 0:
        logger.warning(f"Encountered {missing_edge_count} edges without label")
    if missing_name_count > 0:
        logger.warning(f"Encountered {missing_name_count} nodes without name")
    if missing_label_count > 0:
        logger.warning(f"Encountered {missing_label_count} nodes without label")
    return g

json_converter

Created on 2024-10-09

@author: wf

JsonGraph

json graph handling library

Source code in mogwai/parser/json_converter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class JsonGraph:
    """
    json graph handling library
    """

    def __init__(self):
        self.graph = MogwaiGraph()

    def add_nodes_from_json(self, file_name, file_content):
        """
        Add nodes from JSON data to the graph
        """
        table_name = file_name.split(".")[0]  # Remove the .json extension

        for line in file_content:
            line = line.strip()
            if line:  # Skip empty lines
                try:
                    json_data = json.loads(line)
                    self.add_node(table_name, json_data)
                except json.JSONDecodeError as e:
                    print(f"Error decoding JSON object in file {file_name}: {e}")

    def add_node(self, table_name, data):
        """
        Add a single node to the graph
        """
        node_id = data.get("id", str(len(self.graph)))
        self.graph.add_node(node_id, type=table_name, **data)

    def dump(self, node_types=None, limit: int = 10):
        """
        Dump the content of the graph for investigation.

        Args:
            node_types (list): List of node types to dump. If None, dump all types.
            limit (int): Maximum number of nodes to dump for each node type. Default is 10.
        """
        self._print_graph_summary()
        node_types = self._get_node_types_to_dump(node_types)
        self._dump_nodes(node_types, limit)
        self._dump_edges(limit)

    def _print_graph_summary(self):
        print(f"Total nodes in graph: {len(self.graph.nodes)}")
        print(f"Total edges in graph: {len(self.graph.edges)}")

        all_node_types = set(
            data.get("type", "Unknown") for _, data in self.graph.nodes(data=True)
        )
        print(f"All node types: {all_node_types}")

    def _get_node_types_to_dump(self, requested_types):
        all_node_types = set(
            data.get("type", "Unknown") for _, data in self.graph.nodes(data=True)
        )
        if requested_types is None:
            return all_node_types
        return (
            set(requested_types) & all_node_types
        )  # Ensure we only dump existing types

    def _dump_nodes(self, node_types, limit):
        print(f"Dumping node types: {node_types}")
        for node_type in node_types:
            self._dump_nodes_of_type(node_type, limit)

    def _dump_nodes_of_type(self, node_type, limit):
        print(f"\nDumping up to {limit} nodes of type '{node_type}':")
        count = 0
        for node, data in self.graph.nodes(data=True):
            if data.get("type") == node_type:
                self._print_node(node, data)
                count += 1
                if count >= limit:
                    break
        if count >= limit:
            remaining = (
                sum(
                    1
                    for _, d in self.graph.nodes(data=True)
                    if d.get("type") == node_type
                )
                - limit
            )
            print(f"  ... and {remaining} more")

    def _print_node(self, node, data):
        print(f"  Node: {node}")
        for key, value in data.items():
            if key != "type":
                print(f"    {key}: {value}")

    def _dump_edges(self, limit):
        if self.graph.edges:
            print("\nSample of edges:")
            for i, (u, v, data) in enumerate(self.graph.edges(data=True)):
                print(f"  Edge {i}: {u} -> {v}")
                for key, value in data.items():
                    print(f"    {key}: {value}")
                if i >= limit - 1:
                    remaining = len(self.graph.edges) - limit
                    if remaining > 0:
                        print(f"  ... and {remaining} more")
                    break
        else:
            print("\nNo edges in the graph.")
add_node(table_name, data)

Add a single node to the graph

Source code in mogwai/parser/json_converter.py
35
36
37
38
39
40
def add_node(self, table_name, data):
    """
    Add a single node to the graph
    """
    node_id = data.get("id", str(len(self.graph)))
    self.graph.add_node(node_id, type=table_name, **data)
add_nodes_from_json(file_name, file_content)

Add nodes from JSON data to the graph

Source code in mogwai/parser/json_converter.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def add_nodes_from_json(self, file_name, file_content):
    """
    Add nodes from JSON data to the graph
    """
    table_name = file_name.split(".")[0]  # Remove the .json extension

    for line in file_content:
        line = line.strip()
        if line:  # Skip empty lines
            try:
                json_data = json.loads(line)
                self.add_node(table_name, json_data)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON object in file {file_name}: {e}")
dump(node_types=None, limit=10)

Dump the content of the graph for investigation.

Parameters:

Name Type Description Default
node_types list

List of node types to dump. If None, dump all types.

None
limit int

Maximum number of nodes to dump for each node type. Default is 10.

10
Source code in mogwai/parser/json_converter.py
42
43
44
45
46
47
48
49
50
51
52
53
def dump(self, node_types=None, limit: int = 10):
    """
    Dump the content of the graph for investigation.

    Args:
        node_types (list): List of node types to dump. If None, dump all types.
        limit (int): Maximum number of nodes to dump for each node type. Default is 10.
    """
    self._print_graph_summary()
    node_types = self._get_node_types_to_dump(node_types)
    self._dump_nodes(node_types, limit)
    self._dump_edges(limit)

pdfgraph

powerpoint_converter

schema

graph_schema

Created on 2024-10-22

@author: wf

GraphSchema

registry of node types and their configurations

Source code in mogwai/schema/graph_schema.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@lod_storable
class GraphSchema:
    """registry of node types and their configurations"""

    base_uri: Optional[str] = "http://example.org"
    node_id_type_name: Type = int
    node_type_configs: Dict[str, NodeTypeConfig] = field(default_factory=dict)

    @property
    def node_id_type(self) -> Type:
        """
        Property to convert the node_id_type_name to an actual Python type.
        """
        return getattr(builtins, self.node_id_type_name)

    def get_node_config(self, node_data: dict) -> NodeTypeConfig | None:
        """
        Get the NodeTypeConfig for the node based on its labels.

        Args:
            node_data (dict): The data of the node containing labels

        Returns:
            NodeTypeConfig or None: The NodeTypeConfig for the given node if found, otherwise None.
        """
        if node_data:
            labels = node_data.get("labels", set())
            if labels:
                node_label = next(iter(labels))  # Get the first label
            return self.node_type_configs.get(node_label)
        return None

    def add_to_graph(self, graph: MogwaiGraph):
        """
        add my node type configurations to the given graph

        Args:
            graph(MogwaiGraph): the graph to add the configurations to
        """
        for node_type in self.node_type_configs.values():
            props = node_type.__dict__.copy()
            graph.add_labeled_node(
                "NodeTypeConfig", name=node_type.label, properties=props
            )

    @classmethod
    def yaml_path(cls) -> str:
        """Default path for schema YAML file"""
        module_path = os.path.dirname(os.path.abspath(__file__))
        yaml_path = os.path.join(module_path, "resources", "schema.yaml")
        return yaml_path

    @classmethod
    def load(cls, yaml_path: str = None) -> "GraphSchema":
        """
        Load schema from a YAML file, ensuring the file exists.

        Args:
            yaml_path: Optional path to YAML file. If None, uses default path.

        Returns:
            GraphSchema: Loaded schema or empty schema if file doesn't exist
        """
        if yaml_path is None:
            yaml_path = cls.yaml_path()

        if not Path(yaml_path).exists():
            err_msg = f"Schema YAML file not found: {yaml_path}"
            logging.error(err_msg)
            return cls()

        return cls.load_from_yaml_file(yaml_path)
node_id_type: Type property

Property to convert the node_id_type_name to an actual Python type.

add_to_graph(graph)

add my node type configurations to the given graph

Parameters:

Name Type Description Default
graph(MogwaiGraph)

the graph to add the configurations to

required
Source code in mogwai/schema/graph_schema.py
106
107
108
109
110
111
112
113
114
115
116
117
def add_to_graph(self, graph: MogwaiGraph):
    """
    add my node type configurations to the given graph

    Args:
        graph(MogwaiGraph): the graph to add the configurations to
    """
    for node_type in self.node_type_configs.values():
        props = node_type.__dict__.copy()
        graph.add_labeled_node(
            "NodeTypeConfig", name=node_type.label, properties=props
        )
get_node_config(node_data)

Get the NodeTypeConfig for the node based on its labels.

Parameters:

Name Type Description Default
node_data dict

The data of the node containing labels

required

Returns:

Type Description
NodeTypeConfig | None

NodeTypeConfig or None: The NodeTypeConfig for the given node if found, otherwise None.

Source code in mogwai/schema/graph_schema.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_node_config(self, node_data: dict) -> NodeTypeConfig | None:
    """
    Get the NodeTypeConfig for the node based on its labels.

    Args:
        node_data (dict): The data of the node containing labels

    Returns:
        NodeTypeConfig or None: The NodeTypeConfig for the given node if found, otherwise None.
    """
    if node_data:
        labels = node_data.get("labels", set())
        if labels:
            node_label = next(iter(labels))  # Get the first label
        return self.node_type_configs.get(node_label)
    return None
load(yaml_path=None) classmethod

Load schema from a YAML file, ensuring the file exists.

Parameters:

Name Type Description Default
yaml_path str

Optional path to YAML file. If None, uses default path.

None

Returns:

Name Type Description
GraphSchema GraphSchema

Loaded schema or empty schema if file doesn't exist

Source code in mogwai/schema/graph_schema.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@classmethod
def load(cls, yaml_path: str = None) -> "GraphSchema":
    """
    Load schema from a YAML file, ensuring the file exists.

    Args:
        yaml_path: Optional path to YAML file. If None, uses default path.

    Returns:
        GraphSchema: Loaded schema or empty schema if file doesn't exist
    """
    if yaml_path is None:
        yaml_path = cls.yaml_path()

    if not Path(yaml_path).exists():
        err_msg = f"Schema YAML file not found: {yaml_path}"
        logging.error(err_msg)
        return cls()

    return cls.load_from_yaml_file(yaml_path)
yaml_path() classmethod

Default path for schema YAML file

Source code in mogwai/schema/graph_schema.py
119
120
121
122
123
124
@classmethod
def yaml_path(cls) -> str:
    """Default path for schema YAML file"""
    module_path = os.path.dirname(os.path.abspath(__file__))
    yaml_path = os.path.join(module_path, "resources", "schema.yaml")
    return yaml_path

NodeTypeConfig

Configuration for a node type in the graph

Source code in mogwai/schema/graph_schema.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@lod_storable
class NodeTypeConfig:
    """Configuration for a node type in the graph"""

    label: str  # Label used in the graph database
    # https://fonts.google.com/icons?icon.set=Material+Icons
    icon: str  # Material icon name
    key_field: str  # Primary identifier field
    dataclass_name: (
        str  # module.class name string for the dataclass to be used for this node type
    )
    display_name: str  # Human-readable name for UI
    display_order: int = 1000  # order for display - default to high number
    viewclass_name: Optional[str] = (
        None  # module.class name string for the viewclass to be used for this node type
    )
    description: Optional[str] = None
    _dataclass: Type = field(init=False)
    _viewclass: Type = field(init=False)

    def get_class(self, class_name_attr: str) -> None:
        """
        retrievw a class from its module path string.

        Args:
            class_name_attr: The attribute name containing the class path string

        Raises:
            ValueError: If the class initialization fails
        """
        class_path = getattr(self, class_name_attr)
        if not class_path:
            return None
        try:
            module_path, class_name = class_path.rsplit(".", 1)
            module = importlib.import_module(module_path)
            return getattr(module, class_name)
        except Exception as ex:
            raise ValueError(f"Invalid {class_name_attr}: {class_path}: {str(ex)}")

    def __post_init__(self):
        """Initialize the dataclass and view class types"""
        class_configs = [
            ("dataclass_name", "_dataclass"),
            ("viewclass_name", "_viewclass"),
        ]
        for class_name_attr, target_attr in class_configs:
            clazz = self.get_class(class_name_attr)
            setattr(self, target_attr, clazz)

    def as_view_dict(self) -> Dict:
        view_dict = {"description": self.description, "icon": self.icon}
        return view_dict
__post_init__()

Initialize the dataclass and view class types

Source code in mogwai/schema/graph_schema.py
59
60
61
62
63
64
65
66
67
def __post_init__(self):
    """Initialize the dataclass and view class types"""
    class_configs = [
        ("dataclass_name", "_dataclass"),
        ("viewclass_name", "_viewclass"),
    ]
    for class_name_attr, target_attr in class_configs:
        clazz = self.get_class(class_name_attr)
        setattr(self, target_attr, clazz)
get_class(class_name_attr)

retrievw a class from its module path string.

Parameters:

Name Type Description Default
class_name_attr str

The attribute name containing the class path string

required

Raises:

Type Description
ValueError

If the class initialization fails

Source code in mogwai/schema/graph_schema.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get_class(self, class_name_attr: str) -> None:
    """
    retrievw a class from its module path string.

    Args:
        class_name_attr: The attribute name containing the class path string

    Raises:
        ValueError: If the class initialization fails
    """
    class_path = getattr(self, class_name_attr)
    if not class_path:
        return None
    try:
        module_path, class_name = class_path.rsplit(".", 1)
        module = importlib.import_module(module_path)
        return getattr(module, class_name)
    except Exception as ex:
        raise ValueError(f"Invalid {class_name_attr}: {class_path}: {str(ex)}")

nx_to_rdf

Created on 2024-10-22

@author: wf

NetworkXToRDFConverter

A converter for converting a NetworkX graph to RDF based on the given GraphSchema.

Source code in mogwai/schema/nx_to_rdf.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class NetworkXToRDFConverter:
    """
    A converter for converting a NetworkX graph to RDF based on the given GraphSchema.
    """

    def __init__(self, schema: GraphSchema, namespaces: List[str]):
        """
        Initialize the converter with the given schema.

        Args:
            schema (GraphSchema): The graph schema containing the node type configurations and base URI.
            namespaces (List[str]): A list of namespaces used for managing prefixes in the graph.
        """
        self.schema = schema
        self.base_uri = schema.base_uri
        self.rdf_graph = RDFGraph()
        self.ns = Namespace(self.base_uri)

        # Bind the namespaces for prettier output
        for namespace in namespaces:
            self.rdf_graph.bind(namespace, self.ns)
        self.rdf_graph.bind("xsd", XSD)

    def _get_rdf_literal(self, value):
        """
        Convert Python values to appropriate RDF literals
        """
        if isinstance(value, bool):
            return Literal(value, datatype=XSD.boolean)
        elif isinstance(value, int):
            return Literal(value, datatype=XSD.integer)
        elif isinstance(value, datetime):
            return Literal(value.isoformat(), datatype=XSD.dateTime)
        else:
            return Literal(str(value))

    def convert_node(self, node_id, node_data):
        """
        Convert a NetworkX node to RDF and add it to the RDFLib graph.

        Args:
            node_id: The node identifier
            node_data: The data associated with the node
        """
        node_uri = URIRef(f"{self.base_uri}{node_id}")

        # Add all node attributes as properties
        for key, value in node_data.items():
            if value is not None:  # Skip None values
                predicate = URIRef(f"{self.base_uri}{key}")
                self.rdf_graph.add((node_uri, predicate, self._get_rdf_literal(value)))

    def convert_edge(self, source_id, target_id, edge_data):
        """
        Convert a NetworkX edge to RDF and add it to the RDFLib graph.

        Args:
            source_id: The source node identifier
            target_id: The target node identifier
            edge_data: The data associated with the edge
        """
        source_uri = URIRef(f"{self.base_uri}{source_id}")
        target_uri = URIRef(f"{self.base_uri}{target_id}")

        if edge_data and "labels" in edge_data:
            predicate = URIRef(f"{self.base_uri}{edge_data['labels']}")
            self.rdf_graph.add((source_uri, predicate, target_uri))

    def convert_graph(self, nx_graph: nx.Graph):
        """
        Convert the entire NetworkX graph to RDF.

        Args:
            nx_graph (nx.Graph): The NetworkX graph to convert
        """
        # Convert all nodes
        for node_id, node_data in nx_graph.nodes(data=True):
            self.convert_node(node_id, node_data)

        # Convert all edges
        for source_id, target_id, edge_data in nx_graph.edges(data=True):
            self.convert_edge(source_id, target_id, edge_data)

    def serialize(self, rdf_format: str = "turtle") -> str:
        """
        Serialize the RDF graph to the specified format.

        Args:
            rdf_format (str): The RDF format to serialize to (e.g., 'turtle', 'xml').

        Returns:
            str: The serialized RDF graph.
        """
        return self.rdf_graph.serialize(format=rdf_format)
__init__(schema, namespaces)

Initialize the converter with the given schema.

Parameters:

Name Type Description Default
schema GraphSchema

The graph schema containing the node type configurations and base URI.

required
namespaces List[str]

A list of namespaces used for managing prefixes in the graph.

required
Source code in mogwai/schema/nx_to_rdf.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, schema: GraphSchema, namespaces: List[str]):
    """
    Initialize the converter with the given schema.

    Args:
        schema (GraphSchema): The graph schema containing the node type configurations and base URI.
        namespaces (List[str]): A list of namespaces used for managing prefixes in the graph.
    """
    self.schema = schema
    self.base_uri = schema.base_uri
    self.rdf_graph = RDFGraph()
    self.ns = Namespace(self.base_uri)

    # Bind the namespaces for prettier output
    for namespace in namespaces:
        self.rdf_graph.bind(namespace, self.ns)
    self.rdf_graph.bind("xsd", XSD)
convert_edge(source_id, target_id, edge_data)

Convert a NetworkX edge to RDF and add it to the RDFLib graph.

Parameters:

Name Type Description Default
source_id

The source node identifier

required
target_id

The target node identifier

required
edge_data

The data associated with the edge

required
Source code in mogwai/schema/nx_to_rdf.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def convert_edge(self, source_id, target_id, edge_data):
    """
    Convert a NetworkX edge to RDF and add it to the RDFLib graph.

    Args:
        source_id: The source node identifier
        target_id: The target node identifier
        edge_data: The data associated with the edge
    """
    source_uri = URIRef(f"{self.base_uri}{source_id}")
    target_uri = URIRef(f"{self.base_uri}{target_id}")

    if edge_data and "labels" in edge_data:
        predicate = URIRef(f"{self.base_uri}{edge_data['labels']}")
        self.rdf_graph.add((source_uri, predicate, target_uri))
convert_graph(nx_graph)

Convert the entire NetworkX graph to RDF.

Parameters:

Name Type Description Default
nx_graph Graph

The NetworkX graph to convert

required
Source code in mogwai/schema/nx_to_rdf.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def convert_graph(self, nx_graph: nx.Graph):
    """
    Convert the entire NetworkX graph to RDF.

    Args:
        nx_graph (nx.Graph): The NetworkX graph to convert
    """
    # Convert all nodes
    for node_id, node_data in nx_graph.nodes(data=True):
        self.convert_node(node_id, node_data)

    # Convert all edges
    for source_id, target_id, edge_data in nx_graph.edges(data=True):
        self.convert_edge(source_id, target_id, edge_data)
convert_node(node_id, node_data)

Convert a NetworkX node to RDF and add it to the RDFLib graph.

Parameters:

Name Type Description Default
node_id

The node identifier

required
node_data

The data associated with the node

required
Source code in mogwai/schema/nx_to_rdf.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def convert_node(self, node_id, node_data):
    """
    Convert a NetworkX node to RDF and add it to the RDFLib graph.

    Args:
        node_id: The node identifier
        node_data: The data associated with the node
    """
    node_uri = URIRef(f"{self.base_uri}{node_id}")

    # Add all node attributes as properties
    for key, value in node_data.items():
        if value is not None:  # Skip None values
            predicate = URIRef(f"{self.base_uri}{key}")
            self.rdf_graph.add((node_uri, predicate, self._get_rdf_literal(value)))
serialize(rdf_format='turtle')

Serialize the RDF graph to the specified format.

Parameters:

Name Type Description Default
rdf_format str

The RDF format to serialize to (e.g., 'turtle', 'xml').

'turtle'

Returns:

Name Type Description
str str

The serialized RDF graph.

Source code in mogwai/schema/nx_to_rdf.py
101
102
103
104
105
106
107
108
109
110
111
def serialize(self, rdf_format: str = "turtle") -> str:
    """
    Serialize the RDF graph to the specified format.

    Args:
        rdf_format (str): The RDF format to serialize to (e.g., 'turtle', 'xml').

    Returns:
        str: The serialized RDF graph.
    """
    return self.rdf_graph.serialize(format=rdf_format)

utils

graph_summary

GraphSummary

A class to generate formatted summaries of graph structures.

Source code in mogwai/utils/graph_summary.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class GraphSummary:
    """A class to generate formatted summaries of graph structures."""

    def __init__(
        self,
        graph,
        fmt: str = "github",
        section_formats: Optional[Dict[str, List[SectionFormat]]] = None,
    ):
        """ constructor"""
        self.graph = graph
        self.fmt = fmt
        self.section_formats = section_formats or {
            "github": [SectionFormat(1, "# {header}")],
            "mediawiki": [SectionFormat(1, "= {header} =")],
            "latex": [SectionFormat(1, "\\section{{{header}}}")],
        }


    def summary(self,limit: int = 3) -> str:
        """Generate a summary of the graph."""
        summary=self.dump(limit=limit)
        return summary

    def format_section_header(self, header: str, level: int = 1) -> str:
        """Format the section header using the format string for the current output format."""
        text=header
        section_formats = self.section_formats.get(self.fmt, None)
        print(section_formats, "level", level)
        if section_formats is not None:
            # Find the format string for the requested level
            for section_format in section_formats:
                print(section_format.level, level, section_format.level == level)
                if section_format.level == level:
                    text = section_format.format_section_header(header)
                    return text
            raise ValueError("No format string found for the requested level." + (str(level)) + " in " + str(section_formats))
        else:
            raise ValueError("No format string found for the requested format.")


    def dump(self, node_types=None, limit: int = 10) -> str:
        """
        Dump the content of the graph for investigation.

        Args:
            node_types (list): List of node types to dump. If None, dump all types.
            limit (int): Maximum number of nodes to dump for each node type. Default is 10.

        Returns:
            str: Formatted string containing the graph summary, nodes, and edges.
        """
        output = []
        output.append(self.format_section_header("Graph Summary"))
        output.append(self._get_graph_summary())

        node_types = self._get_node_types_to_dump(node_types)
        output.append(self.format_section_header("Nodes"))
        output.append(self._get_nodes_summary(node_types, limit))

        output.append(self.format_section_header("Edges"))
        output.append(self._get_edges_summary(limit))

        return "\n\n".join(filter(None, output))

    def _get_graph_summary(self) -> str:
        summary_data = [
            ["Total Nodes", len(self.graph.nodes)],
            ["Total Edges", len(self.graph.edges)],
            ["Node Types", ", ".join(self._get_all_node_types())],
        ]
        markup= tabulate(summary_data, headers=["Metric", "Value"], tablefmt=self.fmt)
        return markup

    def _get_all_node_types(self) -> Set[str]:
        return {
            data.get("type", "Unknown")
            for _, data in self.graph.nodes(data=True)
        }

    def _get_node_types_to_dump(self, requested_types) -> Set[str]:
        all_node_types = self._get_all_node_types()
        if requested_types is None:
            return all_node_types
        return set(requested_types) & all_node_types

    def _get_nodes_summary(self, node_types, limit) -> str:
        output = []
        for node_type in node_types:
            output.append(self.format_section_header(f"Node Type: {node_type}", level=2))
            output.append(self._get_nodes_of_type_summary(node_type, limit))
        return "\n".join(filter(None, output))

    def _get_nodes_of_type_summary(self, node_type, limit) -> str:
        rows = []
        for node, data in self.graph.nodes(data=True):
            if data.get("type") == node_type:
                rows.append([node] + [f"{k}: {v}" for k, v in data.items() if k != "type"])
                if len(rows) >= limit:
                    break
        if rows:
            table = tabulate(rows, headers=["Node", "Details"], tablefmt=self.fmt)
            if len(rows) == limit:
                remaining = sum(
                    1
                    for _, d in self.graph.nodes(data=True)
                    if d.get("type") == node_type
                ) - limit
                if remaining > 0:
                    table += f"\n... and {remaining} more"
            return table
        return "No nodes found for this type."

    def _get_edges_summary(self, limit) -> str:
        rows = []
        for i, (u, v, data) in enumerate(self.graph.edges(data=True)):
            rows.append([f"{u} -> {v}"] + [f"{k}: {v}" for k, v in data.items()])
            if len(rows) >= limit:
                break
        if rows:
            table = tabulate(rows, headers=["Edge", "Details"], tablefmt=self.fmt)
            if len(rows) == limit:
                remaining = len(self.graph.edges) - limit
                if remaining > 0:
                    table += f"\n... and {remaining} more"
            return table
        return "No edges in the graph."
__init__(graph, fmt='github', section_formats=None)

constructor

Source code in mogwai/utils/graph_summary.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    graph,
    fmt: str = "github",
    section_formats: Optional[Dict[str, List[SectionFormat]]] = None,
):
    """ constructor"""
    self.graph = graph
    self.fmt = fmt
    self.section_formats = section_formats or {
        "github": [SectionFormat(1, "# {header}")],
        "mediawiki": [SectionFormat(1, "= {header} =")],
        "latex": [SectionFormat(1, "\\section{{{header}}}")],
    }
dump(node_types=None, limit=10)

Dump the content of the graph for investigation.

Parameters:

Name Type Description Default
node_types list

List of node types to dump. If None, dump all types.

None
limit int

Maximum number of nodes to dump for each node type. Default is 10.

10

Returns:

Name Type Description
str str

Formatted string containing the graph summary, nodes, and edges.

Source code in mogwai/utils/graph_summary.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def dump(self, node_types=None, limit: int = 10) -> str:
    """
    Dump the content of the graph for investigation.

    Args:
        node_types (list): List of node types to dump. If None, dump all types.
        limit (int): Maximum number of nodes to dump for each node type. Default is 10.

    Returns:
        str: Formatted string containing the graph summary, nodes, and edges.
    """
    output = []
    output.append(self.format_section_header("Graph Summary"))
    output.append(self._get_graph_summary())

    node_types = self._get_node_types_to_dump(node_types)
    output.append(self.format_section_header("Nodes"))
    output.append(self._get_nodes_summary(node_types, limit))

    output.append(self.format_section_header("Edges"))
    output.append(self._get_edges_summary(limit))

    return "\n\n".join(filter(None, output))
format_section_header(header, level=1)

Format the section header using the format string for the current output format.

Source code in mogwai/utils/graph_summary.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def format_section_header(self, header: str, level: int = 1) -> str:
    """Format the section header using the format string for the current output format."""
    text=header
    section_formats = self.section_formats.get(self.fmt, None)
    print(section_formats, "level", level)
    if section_formats is not None:
        # Find the format string for the requested level
        for section_format in section_formats:
            print(section_format.level, level, section_format.level == level)
            if section_format.level == level:
                text = section_format.format_section_header(header)
                return text
        raise ValueError("No format string found for the requested level." + (str(level)) + " in " + str(section_formats))
    else:
        raise ValueError("No format string found for the requested format.")
summary(limit=3)

Generate a summary of the graph.

Source code in mogwai/utils/graph_summary.py
38
39
40
41
def summary(self,limit: int = 3) -> str:
    """Generate a summary of the graph."""
    summary=self.dump(limit=limit)
    return summary

SectionFormat dataclass

Source code in mogwai/utils/graph_summary.py
 6
 7
 8
 9
10
11
12
13
14
15
16
@dataclass
class SectionFormat:
    level: int = 1
    format_str: str = "{header}"

    def format_section_header(self, header: str) -> str:
        """
        format a given header with my format string
        """
        text= self.format_str.format(header=header)
        return text
format_section_header(header)

format a given header with my format string

Source code in mogwai/utils/graph_summary.py
11
12
13
14
15
16
def format_section_header(self, header: str) -> str:
    """
    format a given header with my format string
    """
    text= self.format_str.format(header=header)
    return text

type_utils

TypeUtils

utility functions to handle types

Source code in mogwai/utils/type_utils.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class TypeUtils:
    """
    utility functions to handle types
    """
    @classmethod
    def get_dict_indexer(cls, keys: List[str]|str, default: Any=None):
        if isinstance(keys, (list,tuple)):
            def indexer(x):
                try:
                    for key in keys:
                        x = x[key]
                    return x
                except:
                    return default
            return indexer
        else:
            return lambda x: x.get(keys, default)

    @classmethod
    def get_set_type(cls, s: Set):
        if len(s)==0: return None
        sample = next(iter(s))
        return type(sample)

    @classmethod
    def get_set_type_all(cls, s: Set):
        if len(s)==0: return None
        sample = next(iter(s))
        t = type(sample)
        for element in s:
            if type(element) is not t:
                return False
        return t

    @classmethod
    def get_list_type(cls, l: List):
        if(len(l)==0): return None
        return type(l[0])

    @classmethod
    def get_list_type_all(cls, l: List):
        if len(l)==0: return None
        dtype = type(l[0])
        if all((type(x) is dtype for x in l)):
            return dtype
        return False

    @classmethod
    def ensure_is_set(cls, s: Set|Generator|List):
        return s if isinstance(s, Set) else set(s)

    @classmethod
    def ensure_is_list(cls, s: Set|Generator|List):
        return s if isinstance(s, List) else list(s)

version

Created on 2024-08-15

@author: wf

Version dataclass

Bases: object

Version handling for pyMogwai

Source code in mogwai/version.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class Version(object):
    """
    Version handling for pyMogwai
    """

    name = "pymogwai"
    version = mogwai.__version__
    date = "2024-08-15"
    updated = "2024-11-14"
    description = "python native gremlin implementation"

    authors = "Wolfgang Fahl"

    chat_url = "https://github.com/juupje/pyMogwai/discussions"
    doc_url = "https://cr.bitplan.com/index.php/pyMogwai"
    cm_url = "https://github.com/juupje/pyMogwai"

    license = f"""Copyright 2024 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

web

i18n_config

Created on 21.10.2024

@author: wf

I18nConfig

Internationalization module configuration

Source code in mogwai/web/i18n_config.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class I18nConfig:
    """
    Internationalization module configuration
    """

    @classmethod
    def config(cls, debug: bool = False):
        module_path = os.path.dirname(os.path.abspath(__file__))
        translations_path = os.path.join(module_path, "resources", "i18n")
        if debug:
            print(f"Loading translations from: {translations_path}")
            print(f"Files in directory: {os.listdir(translations_path)}")
        i18n.load_path.append(translations_path)
        i18n.set("filename_format", "{locale}.{format}")
        i18n.set("file_format", "yaml")
        i18n.set("fallback", "en")

node_view

Created on 2024-10-21

@author: wf

BaseNodeView

Base class for viewing and interacting with nodes in a graph.

Source code in mogwai/web/node_view.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class BaseNodeView:
    """
    Base class for viewing and interacting with nodes in a graph.
    """

    def __init__(self, config: NodeViewConfig):
        """
        Base constructor for initializing the NodeView.

        Args:
        """
        self.solution = config.solution
        self.graph = config.graph
        self.schema = config.schema
        self.node_type = config.node_type
        self.node_type_config = config.node_type_config
        self.node_data_class = config.node_type_config._dataclass
        self.key = config.node_type_config.key_field

    def editable_properties(self, props: dict[str, Any]) -> dict[str, Any]:
        """
        Filter the properties to exclude hidden keys (those starting with '_') and non-string iterables.

        Args:
            props (dict[str, Any]): The dictionary of properties to filter.

        Returns:
            dict[str, Any]: The filtered properties dictionary.
        """
        editable_props = {}
        if props:
            for key, value in props.items():
                if not key.startswith("_") and (
                    not isinstance(value, Iterable) or isinstance(value, str)
                ):
                    editable_props[key] = value
        return editable_props
__init__(config)

Base constructor for initializing the NodeView.

Args:

Source code in mogwai/web/node_view.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, config: NodeViewConfig):
    """
    Base constructor for initializing the NodeView.

    Args:
    """
    self.solution = config.solution
    self.graph = config.graph
    self.schema = config.schema
    self.node_type = config.node_type
    self.node_type_config = config.node_type_config
    self.node_data_class = config.node_type_config._dataclass
    self.key = config.node_type_config.key_field
editable_properties(props)

Filter the properties to exclude hidden keys (those starting with '_') and non-string iterables.

Parameters:

Name Type Description Default
props dict[str, Any]

The dictionary of properties to filter.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The filtered properties dictionary.

Source code in mogwai/web/node_view.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def editable_properties(self, props: dict[str, Any]) -> dict[str, Any]:
    """
    Filter the properties to exclude hidden keys (those starting with '_') and non-string iterables.

    Args:
        props (dict[str, Any]): The dictionary of properties to filter.

    Returns:
        dict[str, Any]: The filtered properties dictionary.
    """
    editable_props = {}
    if props:
        for key, value in props.items():
            if not key.startswith("_") and (
                not isinstance(value, Iterable) or isinstance(value, str)
            ):
                editable_props[key] = value
    return editable_props

NodeTableView

Bases: BaseNodeView

A view for displaying and interacting with nodes of the same type in a MogwaiGraph.

Source code in mogwai/web/node_view.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class NodeTableView(BaseNodeView):
    """
    A view for displaying and interacting with nodes of the same type in a MogwaiGraph.
    """

    def __init__(self, config: NodeViewConfig):
        """
        Initialize the NodeTableView.

        Args:
            config (NodeViewConfig): The configuration dataclass for the view.
        """
        super().__init__(config)
        self.lod_grid = None
        self.node_view = None
        self.log = config.solution.log

    def setup_ui(self):
        """
        Set up the user interface for the NodeTableView
        """
        with ui.column().classes("w-full"):
            msg = f"loading {self.node_type}s ..."
            self.status_label = ui.label(msg).classes("text-h5")
            self.grid_container = ui.row().classes("w-full")
            self.node_view_container = ui.row().classes("w-full")

        with self.status_label:
            ui.spinner(size="sm")
        self.grid_container = ui.row().classes("w-full")
        self.node_view_container = ui.row().classes("w-full")

        # Start load in background
        background_tasks.create(self.load_and_show_nodes())

    async def load_and_show_nodes(self):
        """
        Load nodes in background and update UI
        """
        try:
            nodes_lod = self.get_lod_of_nodes(node_label=self.node_type)
            self.node_items = {}
            self.node_ids = {}
            view_lod = []
            for record in nodes_lod:
                key_value = record.get(self.key)
                item = None
                try:
                    item = from_dict(data_class=self.node_data_class, data=record)
                    self.node_items[key_value] = item
                except Exception as ex:
                    self.log.log("❌", "from_dict", f"{key_value}:{str(ex)}")
                    pass
                node_id = record.get("node_id")
                self.node_ids[key_value] = node_id
                if item and hasattr(item, "as_view_dict"):
                    view_dict = item.as_view_dict()
                else:
                    view_dict = record
                node_link = Link.create(
                    f"/node/{self.node_type}/{node_id}", str(key_value)
                )
                view_dict[self.key] = node_link
                view_lod.append(view_dict)
            self.grid_container.clear()
            with self.grid_container:
                self.lod_grid = ListOfDictsGrid()
                self.lod_grid.load_lod(view_lod)
                # self.lod_grid.ag_grid.options["rowSelection"] = "single"
                # self.lod_grid.ag_grid.on("rowSelected", self.on_row_selected)
            with self.status_label:
                self.status_label.clear()
                msg = f"{len(view_lod)} {self.node_type}s"
                self.status_label.text = msg
        except Exception as ex:
            self.solution.handle_exception(ex)

    def get_lod_of_nodes(self, node_label: str):
        """
        Retrieve a list of dictionaries containing the properties of nodes with the given node_label from the graph.

        Args:
            node_label (str): The label of the nodes to retrieve.

        Returns:
            list: A list of dictionaries containing the properties of the matching nodes, with 'id' included.
        """
        lod = []
        for node_id, node in self.graph.nodes(data=True):
            labels = node.get("labels", set())
            if node_label in labels:
                props = self.editable_properties(node)
                props["node_id"] = node_id
                lod.append(props)
        return lod
__init__(config)

Initialize the NodeTableView.

Parameters:

Name Type Description Default
config NodeViewConfig

The configuration dataclass for the view.

required
Source code in mogwai/web/node_view.py
165
166
167
168
169
170
171
172
173
174
175
def __init__(self, config: NodeViewConfig):
    """
    Initialize the NodeTableView.

    Args:
        config (NodeViewConfig): The configuration dataclass for the view.
    """
    super().__init__(config)
    self.lod_grid = None
    self.node_view = None
    self.log = config.solution.log
get_lod_of_nodes(node_label)

Retrieve a list of dictionaries containing the properties of nodes with the given node_label from the graph.

Parameters:

Name Type Description Default
node_label str

The label of the nodes to retrieve.

required

Returns:

Name Type Description
list

A list of dictionaries containing the properties of the matching nodes, with 'id' included.

Source code in mogwai/web/node_view.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def get_lod_of_nodes(self, node_label: str):
    """
    Retrieve a list of dictionaries containing the properties of nodes with the given node_label from the graph.

    Args:
        node_label (str): The label of the nodes to retrieve.

    Returns:
        list: A list of dictionaries containing the properties of the matching nodes, with 'id' included.
    """
    lod = []
    for node_id, node in self.graph.nodes(data=True):
        labels = node.get("labels", set())
        if node_label in labels:
            props = self.editable_properties(node)
            props["node_id"] = node_id
            lod.append(props)
    return lod
load_and_show_nodes() async

Load nodes in background and update UI

Source code in mogwai/web/node_view.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
async def load_and_show_nodes(self):
    """
    Load nodes in background and update UI
    """
    try:
        nodes_lod = self.get_lod_of_nodes(node_label=self.node_type)
        self.node_items = {}
        self.node_ids = {}
        view_lod = []
        for record in nodes_lod:
            key_value = record.get(self.key)
            item = None
            try:
                item = from_dict(data_class=self.node_data_class, data=record)
                self.node_items[key_value] = item
            except Exception as ex:
                self.log.log("❌", "from_dict", f"{key_value}:{str(ex)}")
                pass
            node_id = record.get("node_id")
            self.node_ids[key_value] = node_id
            if item and hasattr(item, "as_view_dict"):
                view_dict = item.as_view_dict()
            else:
                view_dict = record
            node_link = Link.create(
                f"/node/{self.node_type}/{node_id}", str(key_value)
            )
            view_dict[self.key] = node_link
            view_lod.append(view_dict)
        self.grid_container.clear()
        with self.grid_container:
            self.lod_grid = ListOfDictsGrid()
            self.lod_grid.load_lod(view_lod)
            # self.lod_grid.ag_grid.options["rowSelection"] = "single"
            # self.lod_grid.ag_grid.on("rowSelected", self.on_row_selected)
        with self.status_label:
            self.status_label.clear()
            msg = f"{len(view_lod)} {self.node_type}s"
            self.status_label.text = msg
    except Exception as ex:
        self.solution.handle_exception(ex)
setup_ui()

Set up the user interface for the NodeTableView

Source code in mogwai/web/node_view.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def setup_ui(self):
    """
    Set up the user interface for the NodeTableView
    """
    with ui.column().classes("w-full"):
        msg = f"loading {self.node_type}s ..."
        self.status_label = ui.label(msg).classes("text-h5")
        self.grid_container = ui.row().classes("w-full")
        self.node_view_container = ui.row().classes("w-full")

    with self.status_label:
        ui.spinner(size="sm")
    self.grid_container = ui.row().classes("w-full")
    self.node_view_container = ui.row().classes("w-full")

    # Start load in background
    background_tasks.create(self.load_and_show_nodes())

NodeView

Bases: BaseNodeView

A view for displaying and editing a single node of a NetworkX graph.

Source code in mogwai/web/node_view.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class NodeView(BaseNodeView):
    """
    A view for displaying and editing a single node of a NetworkX graph.
    """

    def __init__(self, config: NodeViewConfig, node_id: Any):
        """
        Construct the NodeView with the given configuration and node ID.

        Args:
            config (NodeViewConfig): The configuration dataclass for the view.
            node_id (Any): The identifier of the node to view/edit.
        """
        super().__init__(config)
        self.node_id = node_id
        self.dict_edit = None
        node_id = self.node_id
        # type coercion
        node_id_type = self.schema.node_id_type
        if not isinstance(node_id, node_id_type):
            self.node_id = node_id_type(node_id)

        self.node_data = self.graph.nodes.get(self.node_id)
        pass

    def setup_ui(self):
        try:
            self.get_dict_edit()
        except Exception as ex:
            self.solution.handle_exception(ex)

    def open(self):
        """
        Show the details of the dict edit
        """
        if self.dict_edit:
            self.dict_edit.expansion.open()

    def close(self):
        """
        Hide the details of the dict edit
        """
        if self.dict_edit:
            self.dict_edit.expansion.close()

    def get_dict_edit(self) -> DictEdit:
        """
        Return a DictEdit instance for editing node attributes.
        """
        # Initialize edit_props and ui_fields together
        edit_props = self.editable_properties(self.node_data)
        ui_fields = {}
        for key, value in edit_props.items():
            field_uidef = FieldUiDef.from_key_value(key, value)
            ui_fields[key] = field_uidef
        # Use get_node_config from GraphSchema
        node_config = self.schema.get_node_config(self.node_data)
        if node_config:
            key_value = edit_props.get(node_config.key_field)
            key_str = f" {key_value}" if key_value else ""
            title = f"{node_config.label}{key_str}"
            icon = node_config.icon
        else:
            title = f"Node: {self.node_id}"
            icon = "account_tree"  # Default icon
        # Define a custom form definition for the title "Node Attributes"
        form_ui_def = FormUiDef(title=f"{title}", icon=icon, ui_fields=ui_fields)

        self.dict_edit = DictEdit(data_to_edit=edit_props, form_ui_def=form_ui_def)
        self.open()
        return self.dict_edit

    def update_node(self, updated_data: dict):
        """
        Update the node in the graph with the edited data

        Args:
            updated_data (dict): The updated node attributes
        """
        nx.set_node_attributes(self.graph, {self.node_id: updated_data})
__init__(config, node_id)

Construct the NodeView with the given configuration and node ID.

Parameters:

Name Type Description Default
config NodeViewConfig

The configuration dataclass for the view.

required
node_id Any

The identifier of the node to view/edit.

required
Source code in mogwai/web/node_view.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(self, config: NodeViewConfig, node_id: Any):
    """
    Construct the NodeView with the given configuration and node ID.

    Args:
        config (NodeViewConfig): The configuration dataclass for the view.
        node_id (Any): The identifier of the node to view/edit.
    """
    super().__init__(config)
    self.node_id = node_id
    self.dict_edit = None
    node_id = self.node_id
    # type coercion
    node_id_type = self.schema.node_id_type
    if not isinstance(node_id, node_id_type):
        self.node_id = node_id_type(node_id)

    self.node_data = self.graph.nodes.get(self.node_id)
    pass
close()

Hide the details of the dict edit

Source code in mogwai/web/node_view.py
116
117
118
119
120
121
def close(self):
    """
    Hide the details of the dict edit
    """
    if self.dict_edit:
        self.dict_edit.expansion.close()
get_dict_edit()

Return a DictEdit instance for editing node attributes.

Source code in mogwai/web/node_view.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_dict_edit(self) -> DictEdit:
    """
    Return a DictEdit instance for editing node attributes.
    """
    # Initialize edit_props and ui_fields together
    edit_props = self.editable_properties(self.node_data)
    ui_fields = {}
    for key, value in edit_props.items():
        field_uidef = FieldUiDef.from_key_value(key, value)
        ui_fields[key] = field_uidef
    # Use get_node_config from GraphSchema
    node_config = self.schema.get_node_config(self.node_data)
    if node_config:
        key_value = edit_props.get(node_config.key_field)
        key_str = f" {key_value}" if key_value else ""
        title = f"{node_config.label}{key_str}"
        icon = node_config.icon
    else:
        title = f"Node: {self.node_id}"
        icon = "account_tree"  # Default icon
    # Define a custom form definition for the title "Node Attributes"
    form_ui_def = FormUiDef(title=f"{title}", icon=icon, ui_fields=ui_fields)

    self.dict_edit = DictEdit(data_to_edit=edit_props, form_ui_def=form_ui_def)
    self.open()
    return self.dict_edit
open()

Show the details of the dict edit

Source code in mogwai/web/node_view.py
109
110
111
112
113
114
def open(self):
    """
    Show the details of the dict edit
    """
    if self.dict_edit:
        self.dict_edit.expansion.open()
update_node(updated_data)

Update the node in the graph with the edited data

Parameters:

Name Type Description Default
updated_data dict

The updated node attributes

required
Source code in mogwai/web/node_view.py
150
151
152
153
154
155
156
157
def update_node(self, updated_data: dict):
    """
    Update the node in the graph with the edited data

    Args:
        updated_data (dict): The updated node attributes
    """
    nx.set_node_attributes(self.graph, {self.node_id: updated_data})

NodeViewConfig dataclass

parameters for the node views

Source code in mogwai/web/node_view.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@dataclass
class NodeViewConfig:
    """
    parameters for the node views
    """

    solution: WebSolution
    graph: MogwaiGraph
    schema: GraphSchema
    node_type: str
    node_type_config: NodeTypeConfig = field(init=False)

    def __post_init__(self):
        self.node_type_config = self.schema.node_type_configs.get(self.node_type)

server

Created on 2024-08-15

@author: wf

MogwaiSolution

Bases: InputWebSolution

the Mogwai solution

Source code in mogwai/web/server.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class MogwaiSolution(InputWebSolution):
    """
    the Mogwai solution
    """

    def __init__(self, webserver: MogwaiWebServer, client: Client):
        """
        Initialize the solution

        Args:
            webserver (MogwaiWebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)
        self.log=Log()
        self.examples=webserver.examples
        self.graph = webserver.graph
        self.schema = webserver.schema
        self.graph_label=None
        self.result_html=None
        self.update_graph("modern")

    def authenticated(self) -> bool:
        """
        Check if the user is authenticated.
        Returns:
            True if the user is authenticated, False otherwise.
        """
        return self.webserver.login.authenticated()

    def configure_menu(self):
        """
        configure additional non-standard menu entries
        """
        with self.header:
            if self.authenticated():
                self.link_button("logout", "/logout", "logout", new_tab=False)
            else:
                self.link_button("login", "/login", "login", new_tab=False)
        # Sorting the node types by display_order
        sorted_node_types = sorted(
            self.schema.node_type_configs.items(),
            key=lambda item: item[1].display_order,
        )

        for node_type_name, node_type in sorted_node_types:  # label e.g. project_list
            label_i18nkey = f"{node_type.label.lower()}_list"
            label = i18n.t(label_i18nkey)
            path = f"/nodes/{node_type_name}"
            self.link_button(label, path, node_type.icon, new_tab=False)


    async def login_ui(self):
        """
        login ui
        """
        await self.webserver.login.login(self)

    async def home(self):
        """Provide the main content page"""
        await self.query_graph()

    async def show_nodes(self, node_type: str):
        """
        show nodes of the given type

        Args:
            node_type(str): the type of nodes to show
        """

        def show():
            try:
                config = NodeViewConfig(
                    solution=self,
                    graph=self.graph,
                    schema=self.schema,
                    node_type=node_type,
                )
                if not config.node_type_config:
                    ui.label(f"{i18n.t('invalid_node_type')}: {node_type}")
                    return
                node_table_view = NodeTableView(config=config)
                node_table_view.setup_ui()
            except Exception as ex:
                self.handle_exception(ex)

        await self.setup_content_div(show)

    async def show_node(self, node_type: str, node_id: str):
        """
        show the given node
        """

        def show():
            config = NodeViewConfig(
                solution=self, graph=self.graph, schema=self.schema, node_type=node_type
            )
            if not config.node_type_config:
                ui.label(f"{i18n.t('invalid_node_type')}: {node_type}")
                return
            # default view is the general NodeView
            view_class = NodeView
            # unless there is a specialization configured
            if config.node_type_config._viewclass:
                view_class = config.node_type_config._viewclass
            node_view = view_class(config=config, node_id=node_id)
            node_view.setup_ui()
            pass

        await self.setup_content_div(show)

    async def on_graph_select(self,vce_args):
        await run.io_bound(self.update_graph,vce_args.value)

    def update_graph(self,graph_name:str):
        try:
            self.graph_name=graph_name
            self.graph = self.load_graph(name=graph_name)
            self.get_graph_label()
            if self.graph_label:
                self.graph_label.update()
        except Exception as ex:
            self.handle_exception(ex)

    def get_graph_label(self)->str:
        self.graph_label_text=f"Query Graph {self.graph.name} {len(self.graph.nodes)} nodes {len(self.graph.edges)} edges"
        return self.graph_label_text

    async def query_graph(self):
        """Graph querying page"""
        def setup_query():
            emphasize="text-h5"
            try:
                with ui.row() as self.header_row:
                    graph_selection=self.examples.get_names()
                    self.graph_selector=self.add_select(
                        "graph",
                        graph_selection,
                        value=self.graph_name,
                        on_change=self.on_graph_select)
                if self.authenticated():
                    with ui.row() as self.upload_row:
                        ui.label("import File").classes(emphasize)
                        file_upload = ui.upload(label="Choose a file", multiple=False, auto_upload=True)
                        file_upload.on('upload', self.handle_upload)

                if self.graph:
                    self.get_graph_label()
                    self.graph_label=ui.label().classes(emphasize)
                    self.graph_label.bind_text_from(self, 'graph_label_text')
                    self.query_text_area = (
                        ui.textarea("Enter Gremlin Query")
                        .props("clearable")
                        .props("rows=5;cols=80")
                        .bind_value_to(self, "query")
                    )
                    ui.button("Run Query", on_click=lambda: self.on_run_query())
                else:
                    ui.label("No graph loaded. Please select a graph first.")
                with ui.row() as self.result_row:
                    self.result_html=ui.html()
            except Exception as ex:
                self.handle_exception(ex)

        await self.setup_content_div(setup_query)

    def load_graph(self,file=None,name:str="modern"):
        if file is None:
            if name in self.examples.get_names():
                graph=self.examples.get(name)
            else:
                raise ValueError(f"invalid graph name {name}")
            graph.name=name
        else:
            if file.name.endswith('.graphml'):
                temp_path = os.path.join(tempfile.gettempdir(), file.name)
                with open(temp_path, 'wb') as f:
                    f.write(file.read())
                graph = graphml_to_mogwaigraph(file=temp_path)
            elif file.name.endswith('.xlsx'):
                graph = EXCELGraph(file)
            elif file.name.endswith('.pdf'):
                graph = PDFGraph(file)
            elif file.name.endswith('.pptx'):
                graph = powerpoint_converter.PPGraph(file=file)
            else:
                raise ValueError(f"invalid file {file.name}")
            graph.name=file.name
        return graph

    def handle_upload(self, e):
        """Handle file upload"""
        file = e.content
        try:
            self.graph=self.load_graph(file)
        except Exception as ex:
            ui.notify(f"Unsupported file: {file.name} {str(ex)}", type="negative")
            return

        if self.graph:
            ui.notify("File parsed successfully", type="positive")

    def on_run_query(self, query:str=None):
        """Run a Gremlin query on the graph"""
        if not self.graph:
            ui.notify("No graph loaded. Please select a graph first.", type="warning")
            return
        try:
            if query is None:
                query=self.query
            query_result=self.run_query(query)
            self.display_result(query_result)
        except Exception as e:
            ui.notify(f"Error executing query: {str(e)}", type="negative")

    def run_query(self,query)->QueryResult:
        g = Trav.MogwaiGraphTraversalSource(self.graph)
        traversal = eval(query, {'g': g})
        if not traversal.terminated:
            traversal=traversal.to_list()
        result = traversal.run()
        qr=QueryResult(traversal=traversal,result=result)
        return qr

    def display_result(self,query_result:QueryResult):
        if self.result_html:
            with self.result_row:
                count=len(query_result.result)
                plural_postfix="s" if count>1 else ""
                markup=f"{count} element{plural_postfix}:<br>"
                for i,traverser in enumerate(query_result.result):
                    markup+=f"{i+1}:{str(traverser)}<br>"
                self.result_html.content=markup
__init__(webserver, client)

Initialize the solution

Parameters:

Name Type Description Default
webserver MogwaiWebServer

The webserver instance associated with this context.

required
client Client

The client instance this context is associated with.

required
Source code in mogwai/web/server.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def __init__(self, webserver: MogwaiWebServer, client: Client):
    """
    Initialize the solution

    Args:
        webserver (MogwaiWebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)
    self.log=Log()
    self.examples=webserver.examples
    self.graph = webserver.graph
    self.schema = webserver.schema
    self.graph_label=None
    self.result_html=None
    self.update_graph("modern")
authenticated()

Check if the user is authenticated. Returns: True if the user is authenticated, False otherwise.

Source code in mogwai/web/server.py
126
127
128
129
130
131
132
def authenticated(self) -> bool:
    """
    Check if the user is authenticated.
    Returns:
        True if the user is authenticated, False otherwise.
    """
    return self.webserver.login.authenticated()
configure_menu()

configure additional non-standard menu entries

Source code in mogwai/web/server.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def configure_menu(self):
    """
    configure additional non-standard menu entries
    """
    with self.header:
        if self.authenticated():
            self.link_button("logout", "/logout", "logout", new_tab=False)
        else:
            self.link_button("login", "/login", "login", new_tab=False)
    # Sorting the node types by display_order
    sorted_node_types = sorted(
        self.schema.node_type_configs.items(),
        key=lambda item: item[1].display_order,
    )

    for node_type_name, node_type in sorted_node_types:  # label e.g. project_list
        label_i18nkey = f"{node_type.label.lower()}_list"
        label = i18n.t(label_i18nkey)
        path = f"/nodes/{node_type_name}"
        self.link_button(label, path, node_type.icon, new_tab=False)
handle_upload(e)

Handle file upload

Source code in mogwai/web/server.py
294
295
296
297
298
299
300
301
302
303
304
def handle_upload(self, e):
    """Handle file upload"""
    file = e.content
    try:
        self.graph=self.load_graph(file)
    except Exception as ex:
        ui.notify(f"Unsupported file: {file.name} {str(ex)}", type="negative")
        return

    if self.graph:
        ui.notify("File parsed successfully", type="positive")
home() async

Provide the main content page

Source code in mogwai/web/server.py
162
163
164
async def home(self):
    """Provide the main content page"""
    await self.query_graph()
login_ui() async

login ui

Source code in mogwai/web/server.py
156
157
158
159
160
async def login_ui(self):
    """
    login ui
    """
    await self.webserver.login.login(self)
on_run_query(query=None)

Run a Gremlin query on the graph

Source code in mogwai/web/server.py
306
307
308
309
310
311
312
313
314
315
316
317
def on_run_query(self, query:str=None):
    """Run a Gremlin query on the graph"""
    if not self.graph:
        ui.notify("No graph loaded. Please select a graph first.", type="warning")
        return
    try:
        if query is None:
            query=self.query
        query_result=self.run_query(query)
        self.display_result(query_result)
    except Exception as e:
        ui.notify(f"Error executing query: {str(e)}", type="negative")
query_graph() async

Graph querying page

Source code in mogwai/web/server.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
async def query_graph(self):
    """Graph querying page"""
    def setup_query():
        emphasize="text-h5"
        try:
            with ui.row() as self.header_row:
                graph_selection=self.examples.get_names()
                self.graph_selector=self.add_select(
                    "graph",
                    graph_selection,
                    value=self.graph_name,
                    on_change=self.on_graph_select)
            if self.authenticated():
                with ui.row() as self.upload_row:
                    ui.label("import File").classes(emphasize)
                    file_upload = ui.upload(label="Choose a file", multiple=False, auto_upload=True)
                    file_upload.on('upload', self.handle_upload)

            if self.graph:
                self.get_graph_label()
                self.graph_label=ui.label().classes(emphasize)
                self.graph_label.bind_text_from(self, 'graph_label_text')
                self.query_text_area = (
                    ui.textarea("Enter Gremlin Query")
                    .props("clearable")
                    .props("rows=5;cols=80")
                    .bind_value_to(self, "query")
                )
                ui.button("Run Query", on_click=lambda: self.on_run_query())
            else:
                ui.label("No graph loaded. Please select a graph first.")
            with ui.row() as self.result_row:
                self.result_html=ui.html()
        except Exception as ex:
            self.handle_exception(ex)

    await self.setup_content_div(setup_query)
show_node(node_type, node_id) async

show the given node

Source code in mogwai/web/server.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
async def show_node(self, node_type: str, node_id: str):
    """
    show the given node
    """

    def show():
        config = NodeViewConfig(
            solution=self, graph=self.graph, schema=self.schema, node_type=node_type
        )
        if not config.node_type_config:
            ui.label(f"{i18n.t('invalid_node_type')}: {node_type}")
            return
        # default view is the general NodeView
        view_class = NodeView
        # unless there is a specialization configured
        if config.node_type_config._viewclass:
            view_class = config.node_type_config._viewclass
        node_view = view_class(config=config, node_id=node_id)
        node_view.setup_ui()
        pass

    await self.setup_content_div(show)
show_nodes(node_type) async

show nodes of the given type

Parameters:

Name Type Description Default
node_type(str)

the type of nodes to show

required
Source code in mogwai/web/server.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
async def show_nodes(self, node_type: str):
    """
    show nodes of the given type

    Args:
        node_type(str): the type of nodes to show
    """

    def show():
        try:
            config = NodeViewConfig(
                solution=self,
                graph=self.graph,
                schema=self.schema,
                node_type=node_type,
            )
            if not config.node_type_config:
                ui.label(f"{i18n.t('invalid_node_type')}: {node_type}")
                return
            node_table_view = NodeTableView(config=config)
            node_table_view.setup_ui()
        except Exception as ex:
            self.handle_exception(ex)

    await self.setup_content_div(show)

MogwaiWebServer

Bases: InputWebserver

Mogwai WebServer

Source code in mogwai/web/server.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class MogwaiWebServer(InputWebserver):
    """
    Mogwai WebServer
    """
    @classmethod
    def get_config(cls) -> WebserverConfig:
        copy_right = "(c)2024 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right,
            version=Version(),
            default_port=9850,
            short_name="mogwai",
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = MogwaiSolution
        return server_config

    def __init__(self):
        """Constructs all the necessary attributes for the WebServer object."""
        InputWebserver.__init__(self, config=MogwaiWebServer.get_config())
        users = Users("~/.solutions/mogwai")
        self.login = Login(self, users)
        self.examples=Graphs()

        # the graph for displaying nodes
        self.graph = MogwaiGraph()
        yaml_path=MogwaiExampleSchema.get_yaml_path()
        self.schema = GraphSchema.load(yaml_path=yaml_path)
        self.schema.add_to_graph(self.graph)

        @ui.page("/")
        async def home(client: Client):
            return await self.page(client, MogwaiSolution.home)

        @ui.page("/query")
        async def query_graph(client: Client):
            return await self.page(client, MogwaiSolution.query_graph)

        @ui.page("/login")
        async def login(client: Client):
            return await self.page(client, MogwaiSolution.login_ui)

        @ui.page("/logout")
        async def logout(client: Client) -> RedirectResponse:
            if self.login.authenticated():
                await self.login.logout()
            return RedirectResponse("/")

        @ui.page("/nodes/{node_type}")
        async def show_nodes(client: Client, node_type: str):
            """
            show the nodes of the given type
            """
            await self.page(client, MogwaiSolution.show_nodes, node_type)

        @ui.page("/node/{node_type}/{node_id}")
        async def node(client: Client, node_type: str, node_id: str):
            """
            show the node with the given node_id
            """
            await self.page(client, MogwaiSolution.show_node, node_type, node_id)


    def configure_run(self):
        """
        configure with args
        """
        I18nConfig.config()

        InputWebserver.configure_run(self)
__init__()

Constructs all the necessary attributes for the WebServer object.

Source code in mogwai/web/server.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(self):
    """Constructs all the necessary attributes for the WebServer object."""
    InputWebserver.__init__(self, config=MogwaiWebServer.get_config())
    users = Users("~/.solutions/mogwai")
    self.login = Login(self, users)
    self.examples=Graphs()

    # the graph for displaying nodes
    self.graph = MogwaiGraph()
    yaml_path=MogwaiExampleSchema.get_yaml_path()
    self.schema = GraphSchema.load(yaml_path=yaml_path)
    self.schema.add_to_graph(self.graph)

    @ui.page("/")
    async def home(client: Client):
        return await self.page(client, MogwaiSolution.home)

    @ui.page("/query")
    async def query_graph(client: Client):
        return await self.page(client, MogwaiSolution.query_graph)

    @ui.page("/login")
    async def login(client: Client):
        return await self.page(client, MogwaiSolution.login_ui)

    @ui.page("/logout")
    async def logout(client: Client) -> RedirectResponse:
        if self.login.authenticated():
            await self.login.logout()
        return RedirectResponse("/")

    @ui.page("/nodes/{node_type}")
    async def show_nodes(client: Client, node_type: str):
        """
        show the nodes of the given type
        """
        await self.page(client, MogwaiSolution.show_nodes, node_type)

    @ui.page("/node/{node_type}/{node_id}")
    async def node(client: Client, node_type: str, node_id: str):
        """
        show the node with the given node_id
        """
        await self.page(client, MogwaiSolution.show_node, node_type, node_id)
configure_run()

configure with args

Source code in mogwai/web/server.py
 96
 97
 98
 99
100
101
102
def configure_run(self):
    """
    configure with args
    """
    I18nConfig.config()

    InputWebserver.configure_run(self)