lprof: A Non-intrusive Request-Flow Profiler for Distributed Systems

lprof: A Non-intrusive Request-Flow Profiler for Distributed Systems – Zhao et al. 2014

The Mystery Machine needs a request id in log records that can be used to correlate entries in a trace. What if you don’t have that? lprof makes the absolute most of whatever logging your system already has.

lprof is novel in that (i) it does not require instrumentation or modifications to source code, but instead extracts information fromthe logs output during the course of normal system operation, and (ii) it is capable of automatically identifying, from the logs, each request and profile its performance behavior. Specifically, lprof is capable of reconstructing how each service request is processed as it invokes methods, uses helper threads, and invokes remote services on other nodes.

As part of the evaluation, lprof is used with the existing logging in HDFS, Hadoop YARN, Cassandra, and HBase at the default logging level. On average, it is able to accurately associate log records with the processing of an individual request 88% of the time.

To evaluate whether lprof would be effective in debugging realistic anomalies, we randomly selected 23 user-reported real-world performance anomalies from the bugzilla databases associated with the systems we tested. This allows us to understand, via a small number of samples, what percentage of real-world performance bugs could benefit fromlprof. For each bug, we carefully read the bug report, the discussions, and the related code and patch to understand it. We then reproduced each one to obtain the logs, and applied lprof to analyze its effectiveness. This is an extremely time-consuming process…

lprof turns out to be useful in detecting and diagnosing 2/3 of these problems. Which is a very impressive result for a tool that requires you to make zero modifications to your source code.

The basic problem is familiar:

… understanding the performance behavior of these systems is hard because the service is (i) distributed across many nodes, (ii) composed of multiple sub-systems (e.g., front-end, application, caching, and database services), and (iii) implemented with many threads/processes running with a high degree of concurrency.

As with The Mystery Machine, the authors observe that distributed systems tend to output a lot of log statements. “In this paper we show that the information in the logs is sufficiently rich to allow the recovering of the inherent structure of the dispersed and intermingled log output messages, thus enabling useful performance profilers like lprof.”

When you have to make do with whatever is available though, figuring out per request information from the logs is a non-trivial problem: the logs messages are unstructured text, they’re spread across multiple nodes in the system, and the messages from multiple requests and threads are inter-twined.

The lprof implementation works with Java, and uses static bytecode analysis to infer as much as it possibly can from the log messages. For the HDFS, Yarn, Cassandra, and HBase codebases the analysis takes about 2 minutes. The static analysis proceeds in four steps:

  • First, each log printing statement found in the code is analysed. It will consist of string constants and variable inserts. A regular expression is generated which will match log messages produced by the statement (for example: Receiving block BP-(.*):blk_(.*)_.* ). Producing the regular expression requires tracing back to the toString() implementations of the inserts. If the type of an insert can be overloaded and the subtypes have different toString() implementations, multiple regexs are created. Regexs are used to map log messages to log points in the code that could have output the message.
  • Secondly, for the variables that are included in a log statement, dataflow analysis is used to determine which ones are modified and which ones are not. Those are not modified are candidate request identifiers.

Request identifiers are used to separate messages from different requests; that is, two log messages with different request identifiers are guaranteed to belong to different requests. However, the converse is not true: two messages with the same identifier value may still belong to different requests…. To infer which log points belong to the processing of the same request, top-level methods are also identified by analyzing when identifiers are modified. We use the term top-level method to refer to the first method of any thread dedicated to the processing of a single type of request.

  • Third, a temporal order analysis is used to help in the case where there is not a unique id for each request. For example, if log message A must precede log message B from an analysis of a code block, then if the log contains the sequence B,A, we know that these must be records for different requests. This goes deeper than just code within an individual method, and considers a DAG for an entire call-graph:

…lprof generates a Directed Acyclic Graph (DAG) for each top-level method (identified in the previous step) from the method’s call graph and control-flow graph (CFG). This DAG contains each log point reachable from the top-level method and is used to help attribute log messages to top-level methods.

  • Finally, communication pair analysis is used to identify threads that communicate with each other. This can identify threads from the same process that communicate via threads or via shared memory. It also looks at communication across the network:

Specifically, whenever lprof finds a pair of invoke instructions whose target methods are the serialization and deserialization methods from the same class, respectively, the top-level methods containing these two instructions are paired. Developers often use third-party data-serialization libraries, such as Google Protocol Buffers. This further eases lprof’s analysis since they provide standardized serialization/deserialization APIs. Among the systems we evaluated, Cassandra is the only one that does not use Google Protocol Buffers, but implements its own serialization library. For Cassandra, a simple annotation to pair C.serialize() with C.deserialize() for any class C is sufficient to correctly pair all of the communicating top-level methods. lprof also parses the Google Protocol Buffer’s protocol annotation file to identify the RPC pairs, where each RPC is explicitly declared.

At the end of the static analysis phase, lprof outputs a file representing the log printing behaviour of the system. It consists of the following four segments:

  1. Top-level methods: a list of tuples with (i) the name of the top-level method, (ii) an index into the DAG representation of the log points, and (iii) a list of request identifiers;
  2. DAGs: the DAG for each top-level method;
  3. Log point regex: the regular expressions for each log point and the identifier for each wildcard;
  4. Communication pairs: a list of tuples that identify the communication points along with the identifiers for the data being communicated.

This file also contains a number of indices to speed processing. It is sent to every machine in the cluster where logs are to be processed.

In contrast to other systems that sample records and then ship them off to a central location for processing, lprof first processes log records in situ. The overall log processing has the form of a map-reduce computation:

The Map function in lprof’s MapReduce log processing job first stitches together the printed log messages from the same request on the same node where the logs are stored, which requires only one linear scan of each log file. Only summary information from the log file and only from requests that traverse multiple nodes is sent over the network in the shuffling phase to the reduce function. This
avoids sending the logs over the network to a centralized location to perform the analysis, which is unrealistic in real-world clusters.

The map and reduce functions use a common data structure called a Request Accumulator (RA) that gathers information relating to the same request:

Each RA contains: (i) a vector of top-level methods that are grouped into this RA; (ii) the value of each request identifier; (iii) a vector of log point sequences, where each sequence comes from one top-level method; (iv) a list of nodes traversed, with the earliest and latest timestamp. The map and reduce functions will iteratively accumulate the information of log messages from the same request into the RAs. In the end, there will be one RA per request that contains the information summarized from all its log messages.

During the initial scan, a parsed log entry is added to an existing RA if the top-level methods match, the identifier values do not conflict, and the log point matches the temporal sequence in the DAG. Otherwise, a new RA is created. Results are then locally combined before the reduce phase.

It combines two RAs into one if there exists a communication pair between the two top-level methods in these two RAs, and the request identifier values do not conflict. Moreover, as a heuristic, we do not merge RAs if the difference between their timestamps is larger than a user-configurable threshold.

Shuffle keys are then assigned to each RA in preparation for the reduce phase:

We do this by considering communication pairs. At the end of the static analysis, if there is a communication pair connecting two top-level methods A and B, A and B are jointed together into a connected component (CC). We iteratively merge more top-level methods into this CC as long as they communicate with any of the top-level methods in this CC. In the end, all of the top-level methods in a CC could communicate, and their RAs are assigned with the same shuffle key.

If the communicating top-level methods have common request identifiers, these are used to further differentiate shuffle keys. And if an RA cannot possible communicate with another RA then no further shuffling occurs and it is output directly to the database. At the end of the reduce phase, information from each RA is stored into a database table. The final output size is about 5% of the raw log size.

Each row in the database represents an individual request, and is associated with a log-sequence id (LID). The log sequence id is a hash of the log-point sequence of a request (e.g LP1, LP3, LP4, LP9, …).

Note the LID captures the unique type and number of log messages, their order within a thread, as well as the number of threads. However, it does not preserve the timing order between threads. Therefore, in practice, there are not many unique log sequences; for example, in HDFS there are only 220 unique log sequences on 200 EC2 nodes running a variety of jobs for 24 hours. We also generate a separate table that maps each log sequence ID to the sequence of log points to enable source-level debugging.

A web application enables visualisation of lprof’s analysis results. And finally at this point sampling is introduced:

One challenge we encountered is that the number of requests is too large when visualizing their latencies. Therefore, when the number of requests in the query result is greater than a threshold, we perform down-sampling and return a smaller number of requests. We used the largest triangle sampling algorithm, which first divides the entire time-series data into small slices, and in each slice it samples the three points that cover the largest area. To further hide the sampling latency, we pre-sample all the requests into different resolutions. Whenever the server receives a user query, it examines each pre-sampled resolution in parallel, and returns the highest resolution whose number of data points is below the threshold.

The authors conclude:

lprof is able to stitch together the dispersed and intertwined log messages and associate them to specific requests based on the information from off-line static analysis on the system’s code. Our evaluation shows that lprof can accurately attribute 88% of the log messages from widely-used, production-quality distributed systems, and is helpful in debugging 65% of the sampled real-world performance anomalies.