[go: up one dir, main page]

File: spaghetti.py

package info (click to toggle)
txtorcon 18.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,672 kB
  • sloc: python: 17,521; makefile: 227
file content (154 lines) | stat: -rw-r--r-- 4,406 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement

import warnings


class FSM(object):
    """
    Pass callables for matcher and handler to add_handler to create
    transitions between states. A "matcher" is a predicate, and if a
    transition handles something, it returns the next state.

    If you want something to track global state, put it in your data
    instance passed to process so that transitions and states can
    access it.

    Example:
    idle = State("IDLE")
    stuff = State("STUFF")

    def is_capital(letter):
        return letter in string.uppercase

    def is_small(letter):
        return letter in string.lowercase

    def do_something_with_capitals(letter):
        print("Got capital:", letter)
        return stuff

    idle.add_transition(
        Transition(stuff, is_capital, self.do_something_with_capitals)
    )
    stuff.add_transition(
        Transition(idle, is_lower, None)
    )
    """

    states = []
    state = None

    def __init__(self, states):
        """first state is the initial state"""
        if len(states) > 0:
            self.state = states[0]
        self.states = states

    def process(self, data):
        if self.state is None:
            raise RuntimeError("There is no initial state.")
        next_state = self.state.process(data)
        if next_state:
            self.state = next_state
        else:
            warnings.warn("No next state", RuntimeWarning)

    def add_state(self, state):
        # first added state is initial state
        if len(self.states) == 0:
            self.state = state
        self.states.append(state)

    def dotty(self):
        r = 'digraph fsm {\n\n'
        for s in self.states:
            r = r + s.dotty()
        r = r + '\n}\n'
        return r


class State(object):
    def __init__(self, name):
        self.name = name
        self.transitions = []

    def process(self, data):
        for t in self.transitions:
            r = t.process(data)
            if r is not None:
                return r
        return None

    def add_transition(self, t):
        self.transitions.append(t)
        t.start_state = self

    def add_transitions(self, transitions):
        for t in transitions:
            self.add_transition(t)

    def __str__(self):
        r = '<State %s [' % self.name
        for t in self.transitions:
            r = r + (' ->%s ' % t.next_state.name)
        r = r + ']>'
        return r

    def dotty(self):
        r = '%s;\n' % self.name
        r = r + 'edge [fontsize=8]\n'
        r = r + 'rankdir=TB;\nnodesep=2;\n'
        for t in self.transitions:
            r = r + '%s -> %s [label="%s\\n%s"]\n' % (self.name,
                                                      t.next_state.name,
                                                      t.matcher.__name__,
                                                      t.handler.__name__)
        return r


class Transition(object):
    def __init__(self, next_state, matcher, handler):
        self.matcher = matcher
        self.handler = handler
        self.start_state = None
        self.next_state = next_state
        if self.next_state is None:
            raise RuntimeError("next_state must be valid")

    def match(self, data):
        """
        used by process; calls handler if matcher returns true for
        data by default. may override instead of providing a matcher
        methdo to ctor.
        """
        if self.matcher is not None:
            return self.matcher(data)
        return True

    def handle(self, data):
        """
        return next state. May override in a subclass to change
        behavior or pass a handler method to ctor
        """
        if self.handler:
            state = self.handler(data)
            if state is None:
                return self.next_state
            return state
        return self.next_state

    def process(self, data):
        """return next state, or None if not handled."""
        if self.match(data):
            return self.handle(data)
        return None

    def __str__(self):
        if self.start_state:
            return "<Transition %s->%s>" % (self.start_state.name,
                                            self.next_state.name)
        return "<Transition ->%s>" % (self.next_state.name,)