Feb 19, 2020
Read in 26 minutes
As an execution engine for big data processing, MR3 is a distributed system consisting of a single master (called DAGAppMaster) and multiple workers (called ContainerWorkers) running across the network. The master orchestrates the execution of workers and implements all the features required of a distributed system. Workers receive commands from the master and communicate with each other in order to transfer intermediate data. In this way, MR3 tries to maximize the utilization of cluster resources.
As is the case for typical distributed systems, the development of MR3 is challenging and demanding. Unsurprisingly it is not the amount of code that makes it hard to develop MR3. Rather the key challenge lies in ensuring the correctness of and maintaining the simplicity of the whole system which involves many concurrent modules running inside the master and many parallel workers running in a non-deterministic way. As a result, we spend a lot of time just testing MR3, even with the full automation of every phase of testing. From our experience, we roughly adhere to the rule of thumb in the book The Mythical Man-month: 1/3 for planning, 1/6 for coding, and 1/2 for testing.
This article presents the details of testing MR3. It is a long article which is moderately technical and contains digressions seemingly unrelated to MR3, but carries the quintessential lesson learned from the development of MR3 over the years. The goal of writing this article is to inspire more confidence to potential users of MR3 regarding its performance, correctness, and reliability by sharing the internal practice of testing MR3 (and to celebrate the release of MR3 1.0).
The remainder of this article is organized in eight sections:
In the entire code base of MR3, we have only two unit tests. A surprise?
We do not rely on unit tests which are not particularly useful for distributed systems anyway. Besides maintaining the code for unit tests is uninteresting at best and can exact a heavy toll once the code reaches a critical mass in terms of the number of test cases. We do not want to keep thousands of unit tests that have accumulated over many years and just return “green” whenever executed.
We do, however, write unit tests in order to check individual methods or small classes. It is just that we discard the test code, without adding to the repository, after checking the main code. The rationale is that writing unit tests should be straightforward and thus take much less time than writing the main code itself. If writing unit tests takes more time than necessary, we choose to make better use of our time by manually scrutinizing the main code. Occasionally we have to revise the main code, in which case we quickly (and gladly) write new unit tests from scratch.
With practically no unit tests in the code base, then, how do we test MR3 at all? Can we even claim that we test MR3? Of course, we test MR3 much more thoroughly than with unit tests. And we test in a much more elegant way.
We invite the reader to take a look at the following Scala code and guess if it contains a bug.
def foo(x: Int, y: Int): Int = {
x / y
}
It is easy to see that the code is not protected against division by zero.
Hence calling the method foo
with zero for the argument y
generates an exception ArithmeticException
.
It is, however, a different question whether the possibility of division by zero is indeed a bug.
We can think of three scenarios:
In the first case,
we can easily fix the bug by either checking the value of y
or implementing an exception handler, as in:
def foo(x: Int, y: Int): Int = {
if (y == 0)
0 // return a default value
else
x / y
}
In the second case, we require that the method be never called with wrong arguments.
If the method is called erroneously with wrong arguments, an exception can be generated to indicate
the presence of a bug on the caller side.
Scala has a built-in construct require
for such a purpose.
def foo(x: Int, y: Int): (Int, Int) = {
require { y != 0 }
x / y
}
In the third case,
no further action is necessary (at least in theory) because of the verification by the author of the method.
Perhaps the author has analyzed all the call sites and checked the possible range of values for y
.
Thus the author may think like:
“I have manually checked all the calls to foo
and it must be safe to use without worrying about y
being zero.”
We can use another built-in construct assert
of Scala to express such a condition on y
as an assertion.
The Boolean condition on y
expressed in the assertion is called an invariant
which may not be universally true but should be true relative to the rest of the code.
def foo(x: Int, y: Int): (Int, Int) = {
assert { y != 0 }
x / y
}
The problem, however, is that the author may have made a mistake while verifying the method.
For example, she may have overlooked a few call sites
or missed an execution path that happens to assign zero to y
.
Hence the invariant that y
is not zero is just a personal claim, rather than a logical fact,
which should be verified separately.
The question now is how to verify invariants expressed in assertions.
For the purpose of verifying invariants, industry and academia have developed three generations of technologies for the past five decades.
The first generation is based on testing. We systematically run many test cases against the invariant and check if it is violated. If all the test cases pass, we gain a higher level of confidence on the truth of the invariant. Thus testing only provides a large volume of anecdotal evidence and does not really verify the invariant, although the testing community “unverifiably” equates testing to verification. The strength of testing lies in its ability to find bugs when test cases fail, its approachability, and its constant evolution in power as evidenced by recent advances in concolic testing.
The second generation is based on formal methods such as abstract interpretation and model checking. Abstract interpretation simulates the execution of a program using abstract states (instead of exact states) and checks if the invariant always stays true. Model checking systematically explores the entire space of program execution and tries to find counter-examples that invalidate the invariant. Unlike testing, both abstract interpretation and model checking formally verify that the invariant is true. For example, if the outcome says “no null pointer dereferencing,” the user can rest assured that the code never dereferences null pointers at runtime.
The third generation is based on program logic such as Hoare logic and separation logic. Given an invariant, we start with a pre-condition known to be true at an earlier point in the program, and calculate strongest post-conditions to reason forward whether it logically implies the invariant. Alternatively we start with a post-condition known to be true at a later point in the program, and calculate weakest pre-conditions to reason backward whether it is logically implied by the invariant. Individual steps of reasoning can be either automatic by using theorem provers such as Z3, or manual by using proof assistants such as Coq. The strength of program logic lies in its ability to verify functional properties. For example, we can use program logic to verify that a method implementing quicksort indeed sorts an input array correctly.
Ideally we wish to use program logic for checking all the invariants in MR3, but in reality, testing is the only option because of its sheer complexity. For comparison, the initial verification of about 10,000 lines of C code in the seL4 kernel (using program logic) took 20 person-years (see Provably trustworthy systems, 2017). From our past research experience in program logic, however, we fully appreciate the importance of writing strong invariants. For references, below are a couple of research papers on separation logic that we published before embarking on the MR3 project.
Thus, since day one of the MR3 project,
we have made it a principle to strive to identify the strongest possible invariant in every part of the source code.
Every time we think
“according to the design of our system, this condition must be true at this point of execution,”
we make sure to turn our finding into an invariant expressed with the assert
construct.
We never discard such an invariant because
along with the internal document (which consists mostly of diagrams of various kinds),
it often conveys the most precious information about the design and implementation of MR3.
As a result of our constant efforts,
the code base of MR3 now has about 900 high quality invariants.
This large corpus of invariants constitutes the basis for testing MR3.
Before we explain how we put these invariants to use for testing, let us examine several examples from the source code of MR3.
In the first example, we show an invariant on a sorted map queueMap
,
which is a mapping from task priorities to task queues, and two auxiliary variables.
protected var queueMap: Map[RunPriority, VertexQueue] =
SortedMap.empty(RunPriority.queueKeyOrdering)
private def checkQueueMap: Boolean = {
val queueMapInvariantsOkay =
queueMap forall { case (_, (vs2, q2)) =>
q2.nonEmpty &&
(q2.keys forall { vs2 contains _.taskAttemptId.vertexId }) }
val resourceForQueuedTaskAttemptsOkay =
resourceForQueuedTaskAttempts == queueMap.values
.map { p => allTaskResource(p._2) }
.fold(Utils.emptyMr3Resource)(_ + _)
val numQueuedTaskAttemptsOkay =
numQueuedTaskAttempts == queueMap.values.map{ p => p._2.size }.sum
queueMapInvariantsOkay && resourceForQueuedTaskAttemptsOkay && numQueuedTaskAttemptsOkay
}
assert { !LOG.isDebugEnabled || checkQueueMap }
The invariant checkQueueMap
can be roughly interpreted as follows:
queueMapInvariantsOkay
).resourceForQueuedTaskAttempts
stores the sum of resources for all the tasks in task queues
(in resourceForQueuedTaskAttemptsOkay
).numQueuedTaskAttemptsOkay
counts the number of all the tasks in task queues (in numQueuedTaskAttemptsOkay
).Now, whenever we update queueMap
, we check its invariant by calling checkQueueMap
(if the logging level is set to DEBUG
).
We do not write comments on queueMap
in the code
because the invariant is more precise and easier to read (if you are familiar with Scala programming).
The readability of the invariant can be attributed to the extensive use of higher-order functions
(forall
, map
, fold
),
which attests to the power of functional programming.
The second example shows an invariant on the state of an object (called WorkerVertex) before and after the state transition in a certain method.
assert { transition(getState, newState)(
Running -> Seq(Running, Committing, Succeeded),
Terminating -> Seq(Terminating, Failed),
noStateChange(New, Inited, Initializing, Ready, Succeeded, Failed, Stopping)) }
The invariant is self-explanatory.
For example, the initial state of Terminating
leads to the final state of either Terminating
or Failed
.
The method noStateChange
means that there is no change in the state.
Similarly to the first example,
we do not write comments on the state transition in the code.
As a typical object in MR3 goes through multiple states during its life cycle,
we usually write an invariant to express the condition that should hold in each state.
For example, the following invariant says that in the state of Failed
,
lastSuccessfulTaskAttemptId
is empty,
all members of taskAttemptViews
are finished,
but the number of successful attempts can be greater than zero.
As a side note, !A || B
is an idiomatic way of expressing that A
implies B
(i.e., if A
is true, then B
is also true).
assert({
!(newState == TaskState.Failed) || {
lastSuccessfulTaskAttemptId.isEmpty &&
taskAttemptViews.values.forall { _.isFinished } &&
numSucceededAttempts >= 0 } }, s"$taskId")
Among all the invariants in MR3, the most important ones are those expressing the dependence between two modules. Such invariants are usually simple, but are the most sought after because of their ability to keep two separate modules consistent with each other. Our experience also shows that a subtle bug in a particular module may fail to invalidate local invariants, but often invalidates those invariants involving a second module.
As an example, the following invariant states that the resource for a task returned from task queues should be smaller than the resource available in a worker. While deceptively simple, this invariant played a crucial role in detecting bugs in the implementation of task queues that would have gone unnoticed without it.
val taskAttemptHintStatus = taskAttemptQueue dequeueWithContext dequeueRequest
assert { taskAttemptHintStatus forall { _._1.taskResource <= dequeueRequest.resource } }
As another example, the following invariant states that if task queues are not empty, a scale-in operation during autoscaling should not terminate all active hosts. It connects two independent modules running in separate threads: one managing task queues and the other responsible for autoscaling.
assert({ !(!isTaskAttemptQueueEmpty) || hostContainerMap.size - numHostsToRemoveFinal > 0 },
s"$hostContainerMap $numHostsToRemoveFinal")
Half the battle in implementing a non-trivial loop is figuring out its loop invariant which is a condition that holds at the beginning of every iteration. (The same rule applies to recursive functions as well, in particular to tail-recursive functions.) As figuring out a strong loop invariant can take a good few hours, we keep all loop invariants in the code, as shown in the following example.
while (currentSrcOutputIndex < srcOutputIndexLimit) {
assert { !(currentSrcOutputIndex != srcOutputIndexStart) ||
(currentSrcOutputIndex % numInitialTasksPerDestTask == 0) }
As we will see later, the inclusion of loop invariants completely eliminates the need for unit tests for loops. Besides loop invariants are the best documentation for loops and thus particularly useful when revisiting the code after several months/years of hiatus.
If writing concurrent programs is hard,
proving their correctness is an order of magnitude harder.
Since even very simple properties can save us a lot of time when reasoning about concurrent programs,
we always try to write invariants on threads.
For example,
the following invariants state two simple properties of threads: 1) the owner of the current thread matches the owner of this
DAG;
2) the current thread holds the lock containerLock
.
assert { !LOG.isDebugEnabled || Utils.getCurrentUser() == this.dagUgi }
assert { !LOG.isDebugEnabled || Thread.holdsLock(containerLock) }
Despite their simplicity, such invariants are very useful when we try to convince ourselves that threads are behaving properly. As an additional benefit, they help us to maintain our peace of mind when reasoning about concurrent programs.
For testing MR3,
we exploit invariants just by computing their values while executing MR3.
The failure of an invariant, which occurs when it evaluates to false, generates an error AssertionError
and immediately indicates the presence of a bug, either in the design or in the implementation.
More importantly, it helps us to quickly locate the source of the bug
because the values of all the relevant variables are logged if necessary.
The story goes as follows:
To visualize the process of checking invariants while executing MR3, let us assume that MR3 is implemented with 5 x 5 = 25 modules represented as tiles in a rectangle. We place green circles for those invariants that are always valid, and red circles for those invariants that fail sometimes (but not always). To represent the progress of executing MR3 and checking invariants, we draw a line starting at the top leftmost corner. If the execution completes successfully, the line ends at the bottom rightmost corner, as illustrated in the following diagram:
If an invariant fails, the execution stops prematurely:
In this case, the outcome of fixing the bug is usually one of the following:
In either case, we have a good chance of replacing a red circle with a green circle.
The execution can also stop without violating any invariant:
In this case, we first analyze the log to find the cause and then try to materialize our findings in new invariants.
In the worst scenario, the execution reaches an invariant that is actually invalid, but happens to evaluate to true. In the end, the execution completes successfully without indicating the presence of a bug:
This case illustrates why it always pays to try to find strong invariants. For example, the same execution could have indicated the presence of a bug if the invariant was much stronger:
As a final example, the following diagram explains, albeit not in a way that is technically sound, why an invariant connecting two modules, which covers a much wider range of execution paths than an ordinary invariant, is so powerful in detecting bugs:
We do not know exactly if the source code of MR3 contains only green circles or if it contains some red circles. Absurd as it may sound, the chance is that it contains only green circles, implying that MR3 has no known bugs! Note that MR3 may actually hide subtle bugs that have not surfaced yet. We, however, are not aware of such bugs because we never, literally never, encounter red circles when testing MR3 (its latest release 1.0). Or it may be that we have not found invariants strong enough to uncover such nasty bugs.
Our claim on the absence of red circles stems not just from the large corpus of invariants but in conjunction with the scope and depth of integration and system tests that we have developed over the years. Intuitively our level of confidence on the correctness of MR3 should be commensurate with 1) the number of invariants and their combined strength and 2) the number of different execution paths covered by the entire suite of integration and system tests.
For 1), we have empirical evidence on the adequacy of the current set of invariants. For example, by virtue of so many invariants scattered throughout the code base of MR3, it is now extremely difficult for a naive bug not to manifest itself. (This is partially the reason why we do not rely on unit tests.) As more invariants accumulate and their combined strength grows, we find it increasingly easier to aggressively optimize any module when we deem necessary, and to extend an existing module or add a new module without losing our peace of mind. More often than not, a wrong change to the code results in the failure of a completely unexpected invariant connecting two modules, which, in turn, allows us to further strengthen it (while murmuring to ourselves “could we even think about this bug, let alone find it, if it were not for this invariant?”).
For 2), we now describe the details of integration and system tests in MR3.
An integration test in MR3 can be thought of as designating one or more waypoints in the execution path so that the system enters certain states before completion. In the following example, the execution path first reaches the waypoint marked “1” and then visits a nearby invalid invariant. In this way, an integration test allows us to inspect a specific module more closely.
An integration test with multiple waypoints is particularly useful. A typical example includes two waypoints such that the first waypoint specifies a starting state and the second waypoint specifies a desired state. Usually such an integration test fails either because the execution completes without entering the desired state, or because the execution violates an invariant on the way to the second waypoint as shown in the following diagram. Clearly the farther the waypoints are from each other, the better chance we have of finding a bug.
Unlike unit tests, integration tests involve all the components of MR3. We first create a client, a master, and one or more workers, all inside the same process. Then we submit a concrete DAG (which can be thought of as input to MR3) and wait until its execution completes. During the course of an integration test, we check all the invariants crossing the execution path. Hence, for every single run of an integration test, all the invariants in the source code of MR3 are put to test.
The synergy between invariants and integration tests explains why a single invariant can be worth hundreds of unit tests in the context of developing MR3. In addition to being an excellent means of documentation, an invariant continues to contribute to the search for bugs as new integration tests join the code base. Put differently, the power of an invariant automatically increases with more integration tests. In contrast, a unit test is useful only for its subject code and only when it is written. Afterwards it contributes only to the maintenance burden.
Most of the integration tests in MR3 are designed to check if the master recovers from bad states. Here are several examples.
IOException
) or from the underlying system (such as InterruptedException
).OutOfMemoryError
).Depending on its design, an integration test may yield a different execution path from each run. This is because in general, the behavior of the master and its workers is non-deterministic. For such an integration test, more runs directly translates to a higher chance of finding bugs, so we produce a sufficiently large number of runs. Currently we use a cluster of 40 nodes where each node executes 20 processes to produce runs in parallel. As an individual run usually takes less than 10 seconds, we can produce at least 40 * 20 * 360 * 24 = 6,912,000 runs per day. From our experience, a single day is more than enough because the outcome of the testing is usually one of the following: 1) we find a simple bug immediately; 2) we find a subtle bug before producing 100,000 runs; 3) we never find a bug however long the testing lasts.
A system test in MR3 is designed to simulate a production environment and evaluate the performance, correctness, and reliability of MR3 using realistic data. It specifies a particular configuration for all the components of MR3 and watches the execution of concrete DAGs in a Hadoop or Kubernetes cluster. In addition to DAGs developed specifically for system tests, we make heavy use of the TPC-DS benchmark whose queries generate highly complex DAGs (with up to 61 vertexes). Similarly to integration tests, a system test checks all the invariants crossing the execution path, but unlike integration tests, it does not designate waypoints in the execution path.
For system tests, we mainly use five clusters of varying capacity and stability. For convenience, we assign a unique color to each cluster. In addition, we occasionally use a few ephemeral Hadoop clusters running on virtual machines in order to check dependency problems.
Cluster | # of nodes | Node specification | System | Stability |
---|---|---|---|---|
Red | 11 nodes | 192GB memory, 6x500GB HDDs | Hadoop with Kerberos | Unstable |
Indigo | 20 nodes | 96GB memory, 6x500GB HDDs | Kubernetes and HDFS | Unstable |
Gold | 42 nodes | 96GB memory, 6x500GB HDDs | Hadoop | Stable |
Blue | 13 nodes | 256GB memory, 6x1TB SSDs | Hadoop with Kerberos | Highly stable |
Orange | 5 nodes | 64GB memory, 1x1TB SSD | Kubernetes | Highly stable |
Red and Indigo are unstable and some nodes crash frequently in the order of once or twice a day under heavy load. To our satisfaction (and pain), the instability of Red and Indigo is actually useful for checking the reliability of MR3. Gold is usually stable, but its nodes crash occasionally. Blue and Orange are highly stable in that their nodes never crash in practice. Orange is connected to a MinIO server simulating Amazon S3. All the clusters use 10 Gigabit network.
The system tests in MR3 can be divided into six categories, ranging from basic tests that complete in less than two hours to stress tests that take a whole week. Below we describe the details of each category. MR3 (its latest release 1.0) passes all the system tests.
Basic tests are designed to check if all the basic functions of MR3 are working normally. We execute DAGs derived from real-world jobs such as WordCount, OrderedWordCount, HashJoin, and SortMergeJoin, all of which read a small dataset and examine the result for correctness. In addition, we execute synthetic DAGs of varying complexity. A synthetic DAG can be a trivial DAG with a single vertex (which reads and writes a small dataset), or a complex DAG with an arbitrary number of vertexes. We execute synthetic DAGs under all possible combinations of the following configurations:
As basic tests are not concerned with performance, a Hadoop cluster running on a local machine suffices.
Feature tests are designed to check if those features of MR3 and Hive on MR3 required in production environments are working normally. We use the Red cluster for its resemblance to a production environment. For example, it runs a Kerberos-enabled Hadoop cluster and sets the max lifetime of delegation tokens to 15 minutes. As such, any bug due to mismanaging Kerberos tickets or delegation tokens is likely to be detected during feature tests.
Here are several examples of feature tests.
Performance tests are designed to ensure that an update to MR3 does not impair its performance accidentally. A better outcome is, of course, to confirm an improvement in the performance. We run performance tests regularly in order not to lose the sense of whether the performance of MR3 is within an acceptable range or not.
For performance tests, we use 1) the TeraSort benchmark on the Red cluster and 2) the TPC-DS benchmark on the Red, Gold, and Blue clusters.
TeraSort executes a DAG with two vertexes (map and reduce) for sorting records of 100 bytes (10 bytes for a key and 90 bytes for a value). As it is independent of the computational complexity of vertexes, TeraSort is a suitable benchmark for testing the overall speed of MR3. For running TeraSort, we use input data of 400GB on the Red cluster (40GB per node).
TPC-DS is the de facto standard for measuring the performance of SQL-based big data systems. It consists of 99 queries with diverse characteristics. We use a scale factor of 1TB on the Red cluster and 10TB on the Gold and Blue clusters. The reader can find the result of testing Hive on MR3 on the TPC-DS benchmark in our previous articles.
The correctness test checks if the result of running TPC-DS queries remains identical to the standard result (modulo rounding errors). The standard result is obtained by crosschecking Hive on MR3 against four other SQL-on-Hadoop systems (Hive-LLAP, Presto, Impala, and SparkSQL). (Our previous article Correctness of Hive on MR3, Presto, and Impala gives details on how to obtain the standard result.) Hence the correctness test enables us to quickly detect wrong updates to Hive on MR3, especially incompatible patches from Apache Hive.
Every system test using the TPC-DS benchmark now uses the queries modified for the correctness test. Currently Hive 3 on MR3 returns a correct result for every query, but Hive 4 on MR3 returns a wrong result for query 70.
Restart tests are designed to check if MR3 survives catastrophic events. The idea is that we kill some workers, kill the master, or even reboot a node in the middle of running the TPC-DS benchmark, and check if all queries complete successfully. We run restart tests on a regular basis on the Orange cluster where Hive on MR3 is always running and upgrading itself automatically.
The last category of system tests consists of stress tests which put Hive on MR3 under heavy load for a long time and check its reliability. We repeatedly run the TPC-DS benchmark either sequentially (with a single Beeline client) or concurrently (with multiple Beeline clients), and observe the state of the whole system and the success rate of queries. Unlike restart tests, we do not kill the master/workers or reboot nodes deliberately. We use the Red, Gold, and Indigo clusters for stress tests.
As an example, a stress test on the Red or Gold cluster proceeds as follows. In the beginning, we run the TPC-DS benchmark sequentially and record the total running time. Then we run the TPC-DS benchmark concurrently using 8, 16, or 32 Beeline clients. Because of the instability of the cluster placed under heavy load, workers often fail to fetch intermediate data and nodes sometimes crash, thus triggering the mechanism of fault tolerance in MR3. We also manually delete intermediate data to produce fetch-failures. This experiment continues for several days. In its last stage, a stress test culminates in the final run of the TPC-DS benchmark. We compare the final run with the initial run to make sure that the master and its workers are still working normally with no deterioration in performance. We also check for resource leaks in memory, threads, and classes loaded.
Here is the result of a stress test using Hive 3 on MR3 1.0 on the Gold cluster.
In this article, we have reviewed the principle and practice of testing MR3. We have highlighted the role that invariants play in the development of MR3, and shown how invariants synergize with integration and system tests. For the reader interested in trying Hive on MR3 in a production environment, hopefully this article serves as partial evidence of its reliability.
Thank you very much for reading this article.