System design interview holds a lot of importance in decision making for senior-level software developer positions. In this blog series, i am going to cover most common design patterns that can be applied to all the system design problems. Knowing these patterns will be helpful in interviews as these would come handy for all kinds of system design problems.
In this blog, i will be covering six common design pattern:
In a distributed system, a quorum is the minimum number of servers on which the operation must be performed successfully before declaring operation’s overall success.
In distributed system, data is replicated across multiple servers for fault tolerance. Once the developers decides to maintain multiple copies of the data, another problem emerges — how to make sure that all replicas are consistent and that all the client view the same data?
Suppose, a database is replicated on 3 machines. In this case, quorum is the minimum number of machines that perform the same activities successfully for a transaction and decide the final outcome of the transaction. So, in case of 3 machines, 2 machines form the majority quorum and if they become concurrent, the transaction is committed.
Quorum enforces the consistency requirement on the majority of servers for the distributed system.
Quorum: The value is just more than half of the number of nodes in cluster i.e (n/2 + 1) where n is the total number of nodes in cluster.
For 4 node cluster, quorum would be 3 (4/2 + 1) i.e. 3 nodes should be consistent for a transaction to form a majority and it can only afford 1 node failure.
For 5 node cluster, quorum would be 3 (5/2 + 1) i.e. 3 nodes should be consistent for a transaction to form a majority and it can only afford 2 nodes failure.
Quorum is achieved when N < R + W (where N: nodes in quorum group, W: minimum write access, R: minimum read nodes).
If the distributed system follows N < R + W, then every read will see atleast one copy of the latest data.
For N=3, R=2, W=2 -> strong consistency
N = 3, R=3, W =1 -> slow read, fast write, not durable
N = 3, R=1, W =3 -> fast read, slow write, durable
Keep in mind that, for R = 1, W = N: read one, write all -> full replication. Not very desirable as writes wont be successful in case of a server failure.
The application produces best performance when 1 < R < W < N -> reads are more frequent than writes.
Cassandra: Each write request can be configured to be successful only if it is written to atleast majority of replica nodes.
One server is elected as leader who is responsible for data replication and acts as pivot to coordinate work.
In order to make distributed system fault tolerate and available, multiple copies of data is stored as backup. To ensure data consistency across all the replicas, quorum can be used i.e. read and writes aren’t considered successful until majority of nodes aren’t successful in replication. However, this brings up another failure issue, i.e. lower availability, which is because system needs to ensure that atleast majority of nodes are up and running at any given time.
Quorum isn’t sufficient as in failure situations client might still get inconsistent reads.
At any time, one server is elected as the leader. The leader is responsible for read/write requests, data replication and acts as central point for all coordination. The follower is responsible for replicating the writes from the leader. In some situations, follower can even serve read requests in case of heavy load.
If a leader fails, a follower gets promoted to become a leader.
Kafka: each partition has a leader which is responsible for all the reads/writes for that partition, while the follower replicates leader’s data and stores it as backup partition.
Every server sends heartbeats at regular intervals to the central server(or a random server chosen by everyone in system) to indicate that they are alive.
In a distributed system, each server sends heartbeat to the centralized server(or a random server) or load balancer to let them know that they are alive and working. In a decentralized system, every server is responsible for some portion of data, based on partitioning and replication. For this reason, timely updates become necessary. This enables the system to take corrective measures i.e. stop sending requests to unhealthy server and either re-direct it to other healthy servers or replace it with another server. How do we get to know when the server fails?
Heartbeating is one of the techniques to detect server failures in distributed environment. All servers periodically( request interval) send heartbeat to the central server (or a random server that would be chosen by all the servers, incase there’s no central server). Incase no heartbeat is received for a configured timeout interval, the system concludes that the server has crashed, then the system stops sending requests to that server and starts working on replacement.
The request interval should be more than the network round trip time between the servers. So, can be conclude that:
timeout interval(1s) > request interval(100ms) > network round trip time between servers(20ms)
Both the servers — sender and receiver have schedulers defined on their end. The scheduler is given a method that send heartbeat and would be executed at regular intervals.
The smaller the timeout interval, the faster can system detect a failure. But it shouldn’t be small enough, as it increases probability of false failure detections(due to temp network failures).
But in case of consensus based system(eg Zookeeper), leaders send heartbeat to all the follower server. Every-time a heartbeat is received, the timestamp is recorded of the arriving heartbeat. If no heartbeat is received within configured time period, it is considered that leader has crashed and a follower is appointed as the leader. This could create issue in case of false alarm when new leader would be appointed but the current leader is still alive. This is resolved by using technique known as Generation Clock(discussed below).
Putting a “Fence” around the previous leader to prevent it from doing any damage or cause any disruption.
In a leader-follower setup, when the followers stop receiving heartbearts, it is impossible to be sure if the leader has stopped working or if there’s a temporary network disruption. If no heartbeats are received for configured timeout period, a new leader is elected.
But if the previous leader was temporary down, then the system has two leaders that would possibly be sending conflicting commands. The problem is how to stop the old leader from running and not possibly send conflicting commands.
Fencing is the idea of putting a fence around a previously active leader so that it cannot access shared cluster resources, thus stop serving any requests and ensuring data integrity. Two approaches to do fencing are —
- Resource Fencing: The system stops the previous leader from accessing shared resources necessary to do essential tasks. For ex: revoking its access to shared storage directory or disabling its network port.
- Disable node: The system stops the previous leader from accessing all resources. A frequent way of doing this is to power-off or restart the previous leader. This technique is also known as STONIT — Shoot The Other Node In The Head.
HDFS uses fencing to stop previous leader NameNode from accessing shared cluster resources, hence maintaining data integrity.
A monotonically increasing number indicating the generation of the server
In a distributed system with leader server, if the central server dies, the system must quickly find a substitute, otherwise the system will quickly deteriorate.
There is a possibility of leader temporarily disconnecting from its followers. This could happen due to intermittent failure like garbage collection pause(When the memory gets full and the program requires space to continue, garbage collection program is executed. During GC pause, all the operations are suspended) or a temporary network disruption which disconnects leader from its follower.
In this case, the leader is still running and after the pause or network disruption is over, the leader will try to send replication requests to its followers. This is dangerous as the cluster would have picked a new leader and starts accepting requests from client. This results in a system with two active leaders that could be issuing conflicting commands.
How can the system detect such a scenario, so that all the nodes ignore requests from the old leader and old leader takes corrective measures and steps down from leadership?
Each time a new leader is elected, the generation number is incremented. This means that if previous leader had number 5, then new leader would have generation number 6. This number is included in every request sent from leader to followers. In this way, followers can easily distinguish between previous and real leader as the real leader would be the one with highest generation number.
The generation number should be stored over disk and it should always remain available.
Every node keeps track of state information about other nodes in cluster and gossip(or share) this information to one other random node every second. Eventually, each node gets to know the status of every other node in cluster.
In a large distributed system where we don’t have a central node that keeps track of all nodes to know if a node is alive or not, we need to find a way to know every node’s state. The simplest way to do this is for every node to maintain a heartbeat with every other node. If a node goes down, it will stop sending heartbeats and everyone will know about the crash. But, this means that O(N²) message would be sent out periodically, which is a very high amount and it will consume a lot of network bandwidth, thus not feasible for large clusters. So, we need a more optimized protocol which assists every node in keeping track of every other node.
Gossip protocol is a peer-to-peer communication protocol in which nodes exchange information about themselves and other nodes they know about at regular intervals. Each node initiates a gossip round every sec to exchange information with one other random node.
Each node stores the metadata as a list of key-value pairs <nodeId, nodeState>. At startup, each node adds the information about itself to the metadata before propagating to other nodes. An ex of metadata could be the IP address, port the node listens on, the partition it is responsible for, etc.
Any node can act as an introducer. Each cluster node schedules a job to transmit the metadata it has to other nodes at regular intervals(usually every sec). When the scheduled task is invoked, it picks up a small set of random nodes from the list of servers from the metadata map.
The node receiving inspects the meta data received to find out:
- the values that the node has but the gossip message doesn’t have
- the values that the gossip message has but node the doesn’t have
In case of mismatch, the one with higher version(each node maintains a version number which is incremented each time a value is update in metadata) value is chosen. The node, then adds the missing metadata in its own state map. The missing values in the gossip message are returned as response. The cluster node that sent out the gossip message, receives the message and adds it to own state map.
This process happens once every sec at each cluster node and everytime a different random node is selected.
The state changes eventually pass through the entire system and all the nodes quickly learn about all the other nodes in cluster.
Dynamo and Cassandra use gossip protocol where nodes send some information to another known node in cluster at random every sec.
Thank you for checking my blog!
In the upcoming blog, i will be covering 5 more system design patterns. So, follow along if you want to get notified about my upcoming blog!