Peer States

Introduction

Peer state it’s its own thing and external classes should not make decision based on it’s value. It can change anytime. Only Peer itself should look into its own PeerState.

Implementation

Each peer has a private property _state and a private dictionary _VALID_STATES_TRANSITIONS_.

_state : indicates current internal state of the peer and _VALID_STATES_TRANSITIONS_ : dict[tuple] where tuple is container of states to which transition is valid from the given key state.

If your try to change it state to another, to which change is invalid from current one, then AssertionError will be called. Currently they are 4 peer states, but each peer can implement it’s own in accordance to necessity. Those states are:

  • initializing - it is a peer state during call of __init__() method.
  • connected - indicates that connection to broker has been established. Peer can now send,subscribe and receive messages
  • ready - indicates that initialization is complete. For ConfiguredPeer that means that all external params and config from ConfigServer has been updated.
  • running - indicates that peer is ready and that peer is currently working on its main task.
  • shutting_down - indicates that peer is shutting down - create_task() and send_message() are not available.
  • finished - indicates that peer already stopped working - it cannot do anything more, all resources are freed.

In order to find out the state of the peer you can use following methods:

  • is_connected
  • is_ready
  • is_running
  • is_shutting_down
  • is_finished

Possible changes in states

Valid state transitions declared in _VALID_STATES_TRANSITIONS_ are represented in diagram below.

   initializing -------------+
        |                    |
*------------------------*   |
|     connected         |    |
|        |              |    |
|*-->  ready ----------*|    |
||       |             ||    |
|*--- running <--------*|    |
|        |              |    |
*--------|--------------*    |
    shutting down <----------+
         |
      finished

In addition identity transitions are valid.

Example

This test serves as an example how to use peer states, it cycles through PeersStates and shows what function calls are valid in given PeerStates:

def test_peer_state_flow(peer):
    """Peer state flow should be kept.
    see: :ref:`Peer States` for reference.
    """
    with mock.patch.multiple(
        peer,
        # Called from _initialize:
        _establish_connections=check_peer_states(
            peer._establish_connections,
            before_states=(PeerState.initializing,),
        ),
        _connections_established=check_peer_states(
            peer._connections_established,  # initialization method, that can be used to
                                            # get information from other peers
            before_states=(PeerState.connected,),  # it is called from connected state, and when it finishes, peer
                                                   # changes its state to ready
        ),
    ):
        try:
            # just after peer start it is not connected
            assert not peer.is_connected
        except AssertionError:
            # peer might already connect, if the computer is fast and another thread will manage to connect it before
            # test starts
            pass
        else:
            assert not peer.is_ready, "When peer is not connected, it should not be ready"
        peer.create_task(dummy_task())  # we can start asyncio tasks for peer
        wait_for_condition(lambda: peer.is_connected)
        assert peer.is_connected
        peer.send_message(OkMsg())  # We can send messages now
        peer.subscribe_for_all_msg_subtype(OkMsg, dummy_handler)  # We can subscribe for messages
        wait_for_condition(lambda: peer.is_ready)
        assert peer.is_ready and not peer.is_running, "Peer should not be running without start"
        peer.start()
        assert peer.is_ready and peer.is_running
        peer.start()  # multiple starts are possible

        peer.stop()
        assert not peer.is_running and peer.is_ready
        peer.stop()  # multiple stops are also possible

        peer.start()
        assert peer.is_running

        peer.create_task(peer.async_shutdown())  # start shutdown process
        wait_for_condition(lambda: peer.is_shutting_down)
        assert peer.is_shutting_down

        # during shutdown most of peers methods are disabled
        with pytest.raises(ShuttingDownException):
            peer.start()
        with pytest.raises(ShuttingDownException):
            peer.send_message(OkMsg())
        with pytest.raises(ShuttingDownException):
            peer.create_task(dummy_task())

        peer.shutdown()
        assert peer.is_finished and peer.is_shutting_down

Messages

There is one message which changes peer state - PeerControlMessage. In addition to peer_id (sender of message) it takes action parameter, which can take arbitrary (string) value. Currently it is only checked against:

  • start : peers _start() method is awaited, effectively changing peers state to running
  • stop : peers _stop() method is awaited, effectively changing peers state to ready
  • close : peers async_shutdown() method is awaited, effectively closing the peer

There are also other messages, send by peer to inform others about state change:

  • PeerReady - sent by ConfiguredPeer when state changes from connected to ready, to inform Broker and ConfigServer about finishing initialization.
  • PanickMsg - sent by Peer when it is shutting down with error
  • BrokerGoodbyeMsg - sent by Peer on shutdown