Peer States¶
Introduction¶
Peer state it’s its own thing and external classes should not make decision based on it’s value. It can change anytime. Only Peer itself should look into its own PeerState.
Implementation¶
Each peer has a private property _state
and a private dictionary _VALID_STATES_TRANSITIONS_
.
_state
: indicates current internal state of the peer and
_VALID_STATES_TRANSITIONS_ : dict[tuple]
where tuple is container of states to which transition is valid from the
given key state.
If your try to change it state to another, to which change is invalid from current one, then AssertionError
will be
called. Currently they are 4 peer states, but each peer can implement it’s own in accordance to necessity. Those
states are:
initializing
- it is a peer state during call of__init__()
method.connected
- indicates that connection to broker has been established.Peer
can now send,subscribe and receive messagesready
- indicates that initialization is complete. ForConfiguredPeer
that means that all external params and config fromConfigServer
has been updated.running
- indicates that peer is ready and that peer is currently working on its main task.shutting_down
- indicates that peer is shutting down -create_task()
andsend_message()
are not available.finished
- indicates that peer already stopped working - it cannot do anything more, all resources are freed.
In order to find out the state of the peer you can use following methods:
is_connected
is_ready
is_running
is_shutting_down
is_finished
Possible changes in states¶
Valid state transitions declared in _VALID_STATES_TRANSITIONS_
are represented in diagram below.
initializing -------------+
| |
*------------------------* |
| connected | |
| | | |
|*--> ready ----------*| |
|| | || |
|*--- running <--------*| |
| | | |
*--------|--------------* |
shutting down <----------+
|
finished
In addition identity transitions are valid.
Example¶
This test serves as an example how to use peer states, it cycles through PeersStates and shows what function calls are valid in given PeerStates:
def test_peer_state_flow(peer):
"""Peer state flow should be kept.
see: :ref:`Peer States` for reference.
"""
with mock.patch.multiple(
peer,
# Called from _initialize:
_establish_connections=check_peer_states(
peer._establish_connections,
before_states=(PeerState.initializing,),
),
_connections_established=check_peer_states(
peer._connections_established, # initialization method, that can be used to
# get information from other peers
before_states=(PeerState.connected,), # it is called from connected state, and when it finishes, peer
# changes its state to ready
),
):
try:
# just after peer start it is not connected
assert not peer.is_connected
except AssertionError:
# peer might already connect, if the computer is fast and another thread will manage to connect it before
# test starts
pass
else:
assert not peer.is_ready, "When peer is not connected, it should not be ready"
peer.create_task(dummy_task()) # we can start asyncio tasks for peer
wait_for_condition(lambda: peer.is_connected)
assert peer.is_connected
peer.send_message(OkMsg()) # We can send messages now
peer.subscribe_for_all_msg_subtype(OkMsg, dummy_handler) # We can subscribe for messages
wait_for_condition(lambda: peer.is_ready)
assert peer.is_ready and not peer.is_running, "Peer should not be running without start"
peer.start()
assert peer.is_ready and peer.is_running
peer.start() # multiple starts are possible
peer.stop()
assert not peer.is_running and peer.is_ready
peer.stop() # multiple stops are also possible
peer.start()
assert peer.is_running
peer.create_task(peer.async_shutdown()) # start shutdown process
wait_for_condition(lambda: peer.is_shutting_down)
assert peer.is_shutting_down
# during shutdown most of peers methods are disabled
with pytest.raises(ShuttingDownException):
peer.start()
with pytest.raises(ShuttingDownException):
peer.send_message(OkMsg())
with pytest.raises(ShuttingDownException):
peer.create_task(dummy_task())
peer.shutdown()
assert peer.is_finished and peer.is_shutting_down
Messages¶
There is one message which changes peer state - PeerControlMessage
.
In addition to peer_id (sender of message) it takes action parameter, which can take arbitrary (string) value.
Currently it is only checked against:
start
: peers_start()
method is awaited, effectively changing peers state torunning
stop
: peers_stop()
method is awaited, effectively changing peers state toready
close
: peersasync_shutdown()
method is awaited, effectively closing the peer
There are also other messages, send by peer to inform others about state change:
PeerReady
- sent byConfiguredPeer
when state changes fromconnected
toready
, to inform Broker and ConfigServer about finishing initialization.PanickMsg
- sent byPeer
when it is shutting down with errorBrokerGoodbyeMsg
- sent byPeer
on shutdown