## Monday, September 25, 2017

### Paper Summary. Proteus: agile ML elasticity through tiered reliability in dynamic resource markets

This paper proposes an elastic ML system, Proteus, that can add/remove transient workers on the fly for exploiting the transient availability of cheap but revocable resources in order to reduce costs and latency of computation. The paper appeared in Eurosys'17 and is authored by Aaron Harlap, Alexey Tumanov, Andrew Chung, Gregory R. Ganger, and Phillip B. Gibbons.

Proteus has two components: AgileML and BidBrain. AgileML extends the parameter-server ML architecture to run on a dynamic mix of stable and transient machines, taking advantage of opportunistic availability of cheap but preemptible AWS Spot instances. BidBrain is the resource allocation component that decides when to acquire and drop transient resources by monitoring current market prices and bidding on new resources when their addition would increase work-per-dollar.

Before delving into AgileML and BidBrain, let's first review the AWS Spot model.

## See Spot run

AWS provides always available compute instances, called "on-demand" instances: you get them when you like and keep them as much as you like provided that you pay their fixed hourly rate.

AWS also offers transient compute-instances via the AWS Spot market. You specify a bid price, and if the current market price is under your bid, you get the instance. You only pay the market price, and not your bid-price. In other words, your bid-price is an upperbound on how much you are comfortable for paying hourly. And if the AWS spot market price for the instance goes above your upperbound rate, AWS pulls the instance from you with only a 2-minute advance warning. Even in this case, the silverlining is that the last incomplete hour of computing is not charged for you, so you get some free computing.

As seen in Figure 3, you can save a lot of money if your computing job can exploit AWS Spot instances. (It is peculiar how the peak prices are sometimes up to 10 times higher than the fixed on-demand instances. This is speculated to prevent a high bid that secures long running instances at AWS Spot at a rate lower than EC2.)

Jobs that are especially suitable for AWS Spot's transient/preemptible computing style are embarrassingly parallel data processing tasks, where pieces are not related and where there is no need to maintain long-lived state. For example, for "shallow computing", such as thumbnail generation, there is no harm done with an instance eviction, as there is no need for continuity across the computation. The question the paper investigates is how to make the AWS Spot model work for "deeper computing", such as ML jobs.

While the paper considers this question for the AWS Spot market, the motivation also applies to enterprise computing in the datacenter. As disclosed in the Google Borg paper, Google distinguishes and prioritizes its production services over  analytic/batch services. If the production services need more resources, they will be given resources to the extent of preempting them from analytic/batch jobs if need be. On the other hand, when there is an excess of resources, analytic/batch jobs can enjoy them opportunistically.

## Stages of AgileML

AgileML has 3 modes/stages as shown in Figure 4. To provide a shorter and more cost-effective computation, AgileML dynamically changes modes based on the availability of cheap transient instances. As the transient to on-demand ratio increases from 1:1 to beyond 15:1, AgileML shifts up from mode 1 up to mode 3. As the ratio decreases, AgileML shifts down from mode 3 down to mode 1.

• Stage 1: Parameter Servers Only on Reliable Machines. Stage 1 spreads the parameter-server across reliable machines only, using transient nodes only for stateless workers. This works for most ML applications including K-means, DNN, Logistic Regression, Sparse Coding, as the workers are stateless while the parameter-servers contain the current solution state.
• Stage 2: ActivePSs on Transient Machines and BackupPSs on Reliable Machines. For transient to reliable node ratio greater than 1:1, AgileML switches to stage 2. Stage 2 uses a primary-backup model for parameter servers, using transient nodes for an active server (ActivePS) and reliable nodes for the hot standby (BackupPS). This relieves the heavy network load at the few reliable resources by spreading it across the many transient resources. The model parameters are sharded across the set of ActivePS instances. Workers send all updates and reads to the ActivePSs, which push updates in bulk to the BackupPSs. The solution state affected by transient node failures or evictions is recovered from BackupPSs. (For backing up ActivePS to BackupPS, it may be possible to explore a threshold-based update mechanism as outlined in the Gaia paper.)
• Stage 3: No Workers on Reliable Machines. Workers colocated with BackupPSs on reliable machines were found to cause straggler effects at transient-to-reliable ratios beyond 15:1. Stage 3 removes these workers, and acts like a sub-case of Stage 2.

## Handling elasticity

The elasticity controller component is responsible for changing modes based on the transient-to-reliable ratio and the network bandwidth. It tracks which workers are participating in the computation, assigns a subset of input data to each worker, and starts new ActivePSs.

For stage 2 and stage 3, half of the transient instances are recruited as ActivePSs, as that performed best in the evaluations. This one-half ratio is likely to be specific to using transient instances, as with reliable instances the more PSs the merrier it is.

During start-up, AgileML divides the parameter state into N partitions, where N is the maximum number of ActivePSs that can exist at any one point.  By using partitions in this way, AgileML avoids the need to re-shard the parameter state when adding or removing servers, instead re-assigning partitions as needed.

As the ActivePS instances increase and decrease, the elasticity controller re-assigns the parameter-server shards across the ActivePS instances appropriately. If all the ActivePSs are evicted, AgileML transfers to Stage 1. It seems like using a level of indirection was sufficient to get this working.

## BidBrain

BidBrain keeps track of historical market prices for transient instances and makes allocation decisions to minimize cost-per-work. An allocation is defined as a set of instances of the same type acquired at the same time and price. Before the end of an allocation's billing hour, BidBrain compares the cost-per-work ratios to decide whether the allocation is renewed or terminated.

## Evaluation

The experiments were performed with 3 ML applications.

• Matrix Factorization (MF) is a technique (a.k.a. collaborative filtering) commonly used in recommendation systems, such as recommending movies to users on Netflix. The goal is to discover latent interactions between the two entities (e.g., users and movies). Given a partially filled matrix X (e.g., a matrix where entry (i, j) is user i’s rating of movie j), MF factorizes X into factor matrices L and R such that their product approximates X.
• Multinomial Logistic Regression (MLR) is a popular model for multi-way classification, often used in the last layer of deep learning models for image classification or text classification. The MLR experiments use the ImageNet dataset with LLC features, containing 64k observations with a feature dimension of 21,504 and 1000 classes.
• Latent Dirichlet Allocation (LDA) is an unsupervised method for discovering hidden semantic structures (topics) in an unstructured collection of documents, each consisting of a bag (multi-set) of words. The evaluated LDA solver implements collapsed Gibbs sampling.

The baseline runs all instances on Spot market machines and uses checkpointing to recover progress if evicted. The experiments show about 17% overhead for MF due to checkpointing. Figure 1 illustrates the cost and time benefits of Proteus over the MLR application. Compared to all on-demand, the baseline improves on cost significantly as expected but increases the runtime by 25%. Proteus improves on cost and also manages to achieve reduced runtime.

On average 32% of Proteus's computing is free computing. But aggressively chasing free computing by bidding very close to market price results in high overhead: 4x increase in runtime and higher costs due to frequent evictions.

## Friday, September 22, 2017

### Paper summary. Distributed Deep Neural Networks over the Cloud, the Edge, and End Devices

This paper is by Surat Teerapittayanon, Bradley McDanel, and H.T. Kung at Harvard University and appeared in ICDCS'17.

The paper is about partitioning the DNN for inference between the edge and the cloud. There has been other work on edge-partitioning of DNNs, most recently the Neurosurgeon paper. The goal there was to figure out the most energy-efficient/fastest-response-time partitioning of a DNN model for inference between the edge and cloud device.

This paper adds a very nice twist to the problem. It adds an exit/output layer at the edge, so that if there is high-confidence in classification output the DNN replies early with the result, without going all the way to the cloud and processing the entire DNN for getting a result. In other words, samples can be classified and exited locally at the edge when the system is confident and offloaded to the cloud when additional processing is required.

This early exit at the edge is achieved by jointly training a single DNN with an edge exit point inserted. During the training, the loss is calculated by taking into account both the edge exit point and the ultimate exit/output point at the cloud end. The joint training does not need to be done via device/cloud collaboration. It can be done centrally or at a datacenter.

## DDNN architecture

The paper classifies the serving/inference hierarchy as local, edge, cloud, but I think local versus edge distinction is somewhat superfluous for now. So in my summary, I am only using two layers: edge versus cloud.

Another thing this paper does differently than existing edge/cloud partitioning work is its support for horizontal model partitioning across devices. Horizontal model partitioning for inference is useful when each device might be a sensor capturing only one sensing modality of the same phenomena.

DDNN uses binary neural networks to reduce the memory cost of NN layers to run on resource constrained end devices. A shortcoming in the paper is that the NN layers that end up staying in the cloud is also binary neural network. That is unnecessary, and it may harm precision.

## DDNN training

At training time, the loss from each exit is combined during backpropagation so that the entire network can be jointly trained, and each exit point achieves good accuracy relative to its depth. The idea is to optimize the lower parts of the DNN to create a sufficiently good feature representations to support both samples exited locally and those processed further in the cloud.

The training is done by forming a joint optimization problem to minimize a weighted sum of the loss functions of each exit. Equal weights for exits are used for the experimental results of this paper.

They use a normalized entropy threshold as the confidence criteria that determines whether to classify (exit) a sample at a particular exit point. The normalized entropy close to 0 means that the DDNN is confident about the prediction of the sample. At each exit point, the normalized entropy is computed and compared against a threshold T in order to determine if the sample should exit at that point.

The exits are determined/inserted manually before training. Future research should look into automating and optimizing the insertion of exit points.

## DDNN evaluation

The evaluation is done using a dataset of 6 cameras capturing the same object (person, car, bus) from different perspectives. The task is to classify to these three categories: person, car, and bus. This task may be overly simple. Moreover, the evaluation of the paper is weak. The multicamera dataset has only 680 training samples and 171 testing samples.

## Tuesday, September 19, 2017

### Paper summary. Gaia: Geo-Distributed Machine Learning Approaching LAN Speeds

This paper appeared in NSDI'17 and is authored by Kevin Hsieh, Aaron Harlap, Nandita Vijaykumar, Dimitris Konomis, Gregory R. Ganger,  Phillip B. Gibbons, and Onur Mutlu.

## Motivation

This paper proposes a framework to distribute an ML system across multiple datacenters, and train models at the same datacenter where the data is generated. This is useful because it avoids the need to move big data over wide-area networks (WANs), which can be slow (WAN bandwidth is about 15x less than LAN bandwidth), costly (AWS does not charge for inside datacenter communication but charges for WAN communication), and also prone to privacy or ownership concerns.

Google's Federated Learning also considered similar motivations, and set out to reduce WAN communication. It worked as follows: 1) smartphones are sent the model by the master/datacenter parameter-server,  2) smartphones compute an updated model based on their local data over some number of iterations, 3) the updated models are sent from the smartphones to the datacenter parameter-server, 4) the datacenter parameter-server aggregates these models (by averaging) to construct the new global model, and 5) Repeat.

The Gaia paper does not cite Federated Learning paper, because they are likely submitted around the same time. There are many parallels between Gaia's approach and that of Federated Learning. Both are based on the parameter-server model, and both prescribe updating the model parameters in a relaxed/stale/approximate synchronous parallel fashion: several iterations are run in-situ before updating the "master" parameter-server. The difference is for Federated Learning there is a "master" parameter-server in the datacenter, whereas Gaia takes a peer-to-peer approach where each datacenter has a parameter-server, and updating the "master" datacenter means synchronizing the parameter-servers across the datacenters.

## Approximate Synchronous Parallelism (ASP) idea

Gaia's Approximate Synchronous Parallelism (ASP) idea tries to eliminate insignificant communication between data centers while still guaranteeing the correctness of ML algorithms. ASP is motivated by the observation that the vast majority of updates to the global ML model parameters from each worker are insignificant, e.g., more than 95% of the updates may produce less than a 1% change to the parameter value. With ASP, these insignificant updates to the same parameter within a data center are aggregated (and thus not communicated to other data centers) until the aggregated updates are significant enough.

ASP builds heavily on the Stale Synchronous Parallelism (SSP) idea for parameter-server ML systems. While SSP bounds how stale (i.e., old) a parameter can be, ASP bounds how inaccurate a parameter can be, in comparison to the most up-to-date value.

ASP allows the ML programmer to specify the function and the threshold to determine the significance of updates for each ML algorithm. The significance threshold has 2 parts: a hard and a soft threshold. The purpose of the hard threshold is to guarantee ML algorithm convergence, while that of soft threshold is to use underutilized WAN bandwidth to speed up convergence. In other words, the soft threshold provides an opportunistic synchronization threshold.

## Architecture

The Gaia architecture is simple: It prescribes adding a layer of indirection to parameter-server model to account for multiple datacenter deployments.

Figure 4 shows the overview of Gaia. In Gaia, each data center has some worker machines and parameter servers. Each worker machine works on a shard of the input data stored in its data center to achieve data parallelism. The parameter servers in each data center collectively maintain a version of the global model copy, and each parameter server handles a shard of this global model copy. A worker machine only READs and UPDATEs the global model copy in its data center. To reduce the communication overhead over WANs, ASP is used between parameter-servers across different data centers. Below are the 3 components of ASP.

The significance filter. ASP takes 2 inputs from user: (1) a significance function and (2) an initial significance threshold. A parameter server aggregates updates from the local worker machines and shares the aggregated updates with other datacenters when the aggregate becomes significant. To facilitate convergence to the optimal point, ASP automatically reduces the significance threshold over time: if the original threshold is v, then the threshold at iteration t of the ML algorithm is $\frac{v}{\sqrt{t}}$.

ASP selective barrier. When a parameter-server receives the significant updates at a rate that is higher than the WAN bandwidth can support, instead of sending updates (which will take a long time), it first sends a short control message to other datacenters. The receiver of this ASP selective barrier message blocks its local workers from reading the specified parameters until it receives the significant updates from the sender of the barrier.

Mirror clock. This provides a final safety net implementing SSP across datacenters. When each parameter server receives all the updates from its local worker machines at the end of a clock (e.g., an iteration), it reports its clock to the servers that are in charge of the same parameters in the other data centers. When a server detects its clock is ahead of the slowest server, it blocks until the slowest mirror server catches up.

## Evaluation

The paper evaluates Gaia with 3 popular ML applications. Matrix Factorization (MF) is a technique commonly used in recommender systems. Topic Modeling (TM) is an unsupervised method for discovering hidden semantic structures (topics) in an unstructured collection of documents, each consisting of a bag (multi-set) of words. Image Classification (IC) is a task to classify images into categories, and uses deep learning and convolutional neural networks (CNNs). All applications use SGD-based optimization.

The experiments, running across 11 Amazon EC2 global regions and on a cluster that emulates EC2 WAN bandwidth, compare Gaia against the Baseline that uses BSP (Bulk Synchronous Parallelism) across all datacenters and inside a LAN.

## Questions

Is this general enough? The introduction says this should apply for SGD based ML algorithms. But are there hidden/implicit assumptions?

What are some examples of advanced significance functions? ML users can define advanced significance functions to be used with Gaia, but this is not explored/explained much in the paper. This may be a hard thing to do even for advanced users.

Even though it is easier to improve bandwidth than latency, the paper focuses on the challenge imposed by the limited WAN bandwidth rather than the WAN latency.  While the end metric for evaluation is completion time of training, the paper does not investigate the effect of network latency. How would the evaluations look if the improvements are investigated in correlation to latency rather than throughput limitations? (I guess we can have a rough idea on this, if we knew how much the barrier control message was used.)

## Saturday, September 16, 2017

### Paper summary. OpenCL Caffe: Accelerating and enabling a cross platform machine learning framework

This 2016 paper presents an OpenCL branch/port of the deep learning framework Caffe. More specifically, this branch replaces the CUDA-based backend of Caffe to an open standard OpenCL backend. The software was first located at https://github.com/amd/OpenCL-caffe, then  graduated to https://github.com/BVLC/caffe/tree/opencl.

Once we develop a DNN model, we ideally like to be able to deploy it for different applications across multiple platforms (servers, NVDIA GPUs, AMD GPUs, ARM GPUs, or even over smartphones and tablets) with minimum developing efforts. Unfortunately, most of the deep learning frameworks (including Caffe) are integrated with CUDA libraries for running on NVIDIA GPUs, and that limits portability across multiple platforms.

OpenCL helps for portability of heterogenous computing across platforms since it is supported by a variety of commercial chip manufacturers: Altera, AMD, Apple, ARM Holdings, Creative Technology, IBM, Imagination Technologies, Intel, Nvidia, Qualcomm, Samsung, Vivante, Xilinx, ZiiLABS and etc. In order to enable compatibility between different platforms, OpenCL program detects the specific devices and compiles at runtime.

OpenCL was originally developed by Apple, Inc and later submitted to Khronos Group. It is also supported by many operating systems including Android, FreeBSD, Linux, macOS, Windows.

## OpenCL Backend Porting and Optimization

Caffe framework is originally written in C++ and CUDA. The CUDA layer of Caffe handles optimization of hardware resource allocation and utilization, e.g. CPU-GPU task assignment, memory management, and data transfer. Since CUDA and OpenCL are different in hardware device abstraction, memory buffer management, synchronization, data transfers, the OpenCL backbend porting is not a straightforward process.

The paper breaks the OpenCL porting process into two phases. Phase 1 achieves a layerwise porting of 3 layers, namely C++ machine learning interfaces, OpenCL wrappers, and GPU kernels. Layerwise porting means the layers are ported one by one and unit tested by using the originals of the other layers to guarantee correctness and convergence of the DNN algorithm.

After all layers are ported to OpenCL in Phase 1, the Phase 2 focuses on performance optimization. Profiling the OpenCL port in Phase 1 (via the AMD profiling tool, CodeXL, assisted with OpenCL event and printf) demonstrates some big bottlenecks. OpenCL's online compilation frequently calls clBuildProgram to create each GPU kernel: for 100 iterations of Cifar training, there were 63 clBuildProgram calls that took about 68% of the time. Another bottleneck was that the convolutional layers take up most of the computation time. BLAS's performance suffered from irregular tall and skinny matrix sizes from different layers.

To handle these, the paper proposes three key optimization techniques including kernel caching to avoid OpenCL online compilation overheads, a batched manner data layout scheme to boost data parallelism, and multiple command queues to boost task parallelism. The optimization techniques effectively map the DNN problem size into existing OpenCL math libraries, and improve hardware resources utilization and boost performance by 4.5x.

## Evaluation

The evaluation uses the AlexNet DNN model for ImageNet. The evaluation compares the performance of Caffe with CUDA (including both cuBLAS and cuDNN v2) vs OpenCL (including both original clBLAS and batched manner optimization) on NVIDIA TitanX and AMD R9 Fury, with Alexnet model with mini-batch size 100. As shown in Figure 3, OpenCL Caffe with optimizations over clBLAS matches performance of cuBLAS Caffe.

Compared to the highly optimized machine learning cuDNN library, OpenCL Caffe still has a performance gap of 2x as it lacks those optimizations. The authors argue given the current performance, the OpenCL caffe is still competitive in terms of performance per dollar, considering the market price difference of AMD R9 Fury (about 560 dollars) and the NVIDIA TitanX (about 1000 dollars).

## Cross Platform Capability Analysis

A natural question that occurs is, would their OpenCL port of Caffe that is tested on AMD GPUs would automatically work with ARM MALI GPUs as well? That would be a good test of portability of OpenCL port of Caffe. This has not been answered in the paper.

However, the authors caution about minor problems in compatibility. "There are some differences in specific manufacturers' extension and keywords. For example, caffe uses a lot of template in GPU kernels to support different floating point precision. But it turns out the template keywords for different manufactures are different, which adds more difficulty for the same code to run on different platform without modifications."

OpenCL support of deep learning frameworks is still not great, but hopefully it is getting better everyday as this paper shows.

The slides presentation for the paper is also available here.

## Tuesday, September 12, 2017

### Retroscope: Retrospective Monitoring of Distributed Systems (part 2)

This post, part 2, focuses on monitoring distributed systems using Retroscope. This is a joint post with Aleksey Charapko. If you are unfamiliar with hybrid logical clocks and distributed snapshots, give part 1 a read first.

Monitoring and debugging distributed applications is a grueling task. When you need to debug a distributed application, you will often be required to carefully examine the logs from different components of the application and try to figure out how these components interact with each other. Our monitoring solution, Retroscope, can help you with aligning/sorting these logs and searching/focusing on the interesting parts. In particular, Retroscope captures a progression of globally consistent distributed states of a system and allows you to examine these states and search for global predicates.

Let’s say you are working on debugging ZooKeeper, a popular coordination service. Using Retroscope you can easily add instrumentation to the ZooKeeper nodes to log and stream events to the Retroscope Storage and Compute module. After that, you can use RQL, Retroscope's simple SQL-like query language, to process your queries on those logs to detect the predicates/conditions that hold on the globally consistent cuts sorted in Retroscope's compute module.

Let's say you want to examine the staleness of the data in the ZooKeeper cluster (since ZooKeeper allows local reads from a single node, a client can read a stale value that has already been updated). By adding instrumentation to track the versions of znodes (the objects that keep data) at each ZooKeeper node, you can monitor how a particular znode changes across the entire cluster in a progression of globally consistent cuts with a very short query: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of a znode r1.

The above example still requires you to sift through all the changes in znode r1 across a progression of cuts to see whether there are stale values of r1 in some of the cuts. Instead, you can modify the query to have it return only globally consistent cuts that contain stale versions of the znode: "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2". The expression "vr1 - vr1 >= 2" is acceptable in Retroscope, since multiple different copies of variable vr1 exist on different nodes (one at each node). With this expression, Retroscope computes the difference between znode r1’s versions across any two nodes for all possible combinations of nodes, and if the difference is 2 or more, Retroscope emits the cut. This query will give you a precise knowledge about global states in which znode differs by at least 2 versions between nodes in the system.

Let's say you got curious about the cause for the staleness in some of the ZooKeeper nodes. By including more information to be returned in each cut, you can test various hypotheses about the causes of staleness.  For instance, if you suspect that some ZooKeeper nodes to be unproportionally loaded by clients, you can test it with a query that returns a number of active client connections: "SELECT vr1, client_connections FROM setd WHEN vr1 - vr1 >= 2".  And you can further refine your query to filter staleness cases to when imbalance in client connections is high: "SELECT vr1, client_connections FROM setd WHEN vr1 - vr1 >= 2 AND client_connections - client_connections > 50".

## Architecture

OK, let's look at how Retroscope works under the hood.

To keep the overhead as low as possible for the system being monitored, we augment the target system nodes with only a lightweight RetroLog server module to extract/forward logs to a separate Retroscope storage/compute cluster for monitoring. RetroLogs employ Apache Ignite’s streaming capabilities to deliver the logs to the Retroscope storage/compute cluster. RetroLogs also provide Hybrid Logical Clock (HLC) capabilities to the target system being monitored. HLC is an essential component of Retroscope, as it allows the system to identify consistent cuts. The logging part is similar to how regular logging tools work, except the values being logged are not simply messages, but key-value pairs with the key being the variable being monitored and the value being, well, the current value of the variable.

The architecture figure above shows the insides of the Retroscope storage/compute cluster implemented using Apache Ignite. As the RetroLog server stream logs to Ignite cluster for aggregation and storage, the Ignite streaming breaks messages into blocks based on their HLC time. For instance, all messages between timestamps 10 and 20 will be in the same block. The Ignite Data Grid then stores the blocks until they are needed for computing the past consistent cuts.

The RQLServer acts as the query parser and task scheduler for our monitoring tool. It breaks down the duration of state-changes over which a query needs to be evaluated into non-overlapping time-shards and assigns these time-shards to worker nodes for evaluation. As workers process through their assignments, they emit the results back to the RQLServer which then presents the complete output to you.

Let's now zoom in to the Retroscope workers. These workers are designed to search for predicates through a sequence of global distributed states. The search always starts at some known state and progresses backwards one state-change at a time. (Each time-shard has its ending state computed before being assigned to the workers. This process is done only once, as time-shards are reused between different queries. When workers perform the search, they always start at the time-shard's ending state and progress backwards one state-change at a time.) As the worker moves backwards and undo the changes, it arrives to new consistent cuts and evaluates the query predicate to decide if the cut must be emitted to the user or not. The figure illustrates running "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2" query on a ZooKeeper cluster of 3 nodes. Each time a variable's value is changed, it is being recorded into a Retroscope log. As the systems starts to search for the global states satisfying the query condition, it first looks at state c6. At this cut the difference between versions on Node 3 and Node 1 is 2, meaning that the worker emits the cut back. The system then moves to a previous cut c5, however at this point the staleness is only 1 version, so the cut is skipped. The search ends when all consistent cuts are evaluated this way. In the example, you will see cuts c6, c3, c2, c1, and c0 being emitted, while other cuts have small staleness and do not satisfy the condition of the query.

## Preliminary Evaluation

We performed a preliminary evaluation of Retroscope monitoring on ZooKeeper to study the znode staleness from the server perspective. To the best of our knowledge, no prior studies have been conducted to see how stale ZooKeeper values can be from the server stance.

We started by looking at a normal running ZooKeeper cluster on AWS and ran a test workload of 10,000 updates to either a single znode or 10 znodes. All the updates were done by a single synchronous client to make sure that the ZooKeeper cluster is not saturated under a heavy load. After the updates are over, we ran a simple retrospective RQL query to retrieve consistent states at which values have been changing in znode r1: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of znode r1. This query produces lots of cuts that must be examined manually, however a quick glance over it shows that a staleness of 1 version is common. This is a normal behavior, as Retroscope was capturing staleness of 1 version right after the value has been committed, and the commit messages may still be in flight to the follower nodes.

To search whether there exists any cases where staleness is 2 versions or greater, we then evaluated the following query: "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2". With this query we learned that by targeting a single znode, we were able to observe a staleness of 2 or more versions on about 0.40% of consistent cuts examined. Surprisingly, we also observed a spike of 9 versions in staleness at one cut, however the stale node was then able to quickly catch up. (Probably that spike was due to garbage collection.) We observed that spreading the load across 10 znodes reduced the staleness observed for a single znode.

We performed the same staleness experiment on the cluster with one straggler ZooKeeper node that was artificially made to introduce a slight delay of 2ms in the processing of incoming messages. With the straggler, ZooKeeper staleness hit extremes: the slow node was able to keep up with the rest of the cluster only for the first 7 seconds of the test run but then fell further and further behind.

We measured the query processing performance on a single AWS EC2 t2.micro worker node with  1 vCPU and 1 GB of RAM. With such limited resources, a Retroscope worker was still able to process up to 14,000 consistent cuts per second in our ZooKeeper experiments.

## Future Plans

We are working on enhancing the capabilities of Retroscope to provide richer and more universal querying language for variety of monitoring and debugging situations. Our plans include adding more complex searching capabilities with conditions that can span across multiple consistent cuts. We are also expanding the logging and processing capabilities to allow keeping track of arrays and sets instead of just single-value variables.

Here are my other posts on "monitoring" systems:
http://muratbuffalo.blogspot.com/search?q=monitoring

## Monday, September 11, 2017

### Web Data Management in RDF Age

This was the keynote on ICDCS'17 Day 2, by Tamer Ozsu. Below are my notes from his talk. The slides for his presentation are available here.

Querying web data presents challenges due to lack of a schema, its volatility, and its sheer scale. There have been several recent approaches to querying web data, including XML, JSON, and fusion tables. This talk is about another approach to maintaining and querying web data: RDF and SPARQL. This last one is the recommended way by W3C (World Wide Web Consortium) and is a building block for semantic web and Linked Open Data (LOD). Here is a diagram denoting the LOD datasets and links between them as of 2014.

## Resource Description Framework (RDF)

In RDF, everything is a uniquely named resource (URI). The ID for Jack Nicholson's resource is JN29704. Resources have defined attributes: y:JN29704 hasName = "Jack Nicholson", y:JN29704 BornOnDate = "1937-04-22". The relationships with other resources can be defined via predicates.(This is "predicates in grammar context", and not in logic context.) Predicates for triples of the form "Subject Predicate Object". The subjects always URIs, and the objects are "literals" or URIs.

It turns out biologists are heavy users of RDF with the UniProt dataset.

## RDF query model: SPARQL

SPARQL provides a flavor of SQL. It operates on the RDF triple, and now the _variables_ can be used to substitute in place of subject, predicate, object as well. Thus it is also possible to represent SPARQL queries as a graph. Once you represent the SPARQL query as a graph, answering a SPARQL query reduces to a subgraph matching matching problem between the RDF data-graph and the query-graph.

As in querying with SQL, too many joins are bad for performance, and a lot of work is extended to optimize this.

## Distributed SPARQL processing

If it is possible to gather all RDF needed in one place, then querying can be done easily with using common cloud computing tools. You can partition and store the RDF on HDFS, and then run SPARQL queries on this as mapreduce jobs (say using Spark).

If it is not possible to gather all RDF data, you need to a distributed querying, and for this methods similar to distributed RDBMS can be used.

To complicate this further not all of the RDF data sites can process SPARQL queries. So several approaches are formulated to deal with the problem that poses for distributed SPARQL processing, such as writing wrappers to execute SPARQL queries on these sites.

Finally live querying of the web of RDF linked data is also possible by sending bots to traverse/browse this graph at runtime.

## Thursday, September 7, 2017

### Gratitude

The other day I had taken my little daughter to the library. There was a lady tutoring his son. The boy was slightly over-active and argumentative. He was wearing a wool Buffalo hat. It was warm in the library, so that seemed out of place. It was also strange that the boy, who is around 7-8 years old, was not in school at this time of the day. But I can't put 2 and 2 together.

I assumed that the lady is home-schooling the boy. Since I was interested about how home-schooling works (what works and what not), I asked her about it. (Although I am a shy introvert, I am not shy from asking people questions when I am curious about something. That is a little oddity I have.) She said she was not homeschooling, but her son missed a lot of classes, so she was trying to help him catch up. At that moment, I notice the thick book she was reading: "CANCER"!

My heart sank. I tried to seem upbeat though. I asked the boy his name, Nicholas, and wished him best of luck with his studies.

---

Life is that way. We get selfish and self-centered when left to our devices. We think we have it bad, that we have big challenges. We become drama queens.  We are stressed about work, we complain about other small stuff. And occasionally we are hit with an experience like that, where a real challenge presents itself for us or for others, that reminds us how grateful we should have been.

We should not give into entropy and get too self-centered. Everybody has their own problems, challenges, and burdens. And sometimes those are huge challenges. The world would be a better place if we are more grateful for ourselves and for the moment and try to sympathize and help others with their challenges, rather than being self-absorbed with our petty problems.

Here is a related video I came across recently on gratitude.

This one is for kids, but hey it is simple.

## Wednesday, September 6, 2017

### Paper summary. Untangling Blockchain: A Data Processing View of Blockchain Systems

This is a recent paper submitted to arxiv (Aug 17, 2017). The paper presents a hype-free, very accessible, yet technical  survey on blockchain. It takes a data-processing centric view of blockchain technology, treating  concurrency issues and efficiency of consensus as first class citizens in blockchain design. Approaching from that perspective, the paper  tries to forge a connection with blockchain technologies and distributed transaction processing systems. I think this perspective is promising for grounding the blockchain research better. When we connect a new area to an area with a large literature, we have opportunities to compare/contrast and perform efficient OODA loops.

Thanks to this paper, I am finally getting excited about blockchains. I had a huge prejudice about blockchains based on the Proof-of-Work approach being used in BitCoin. Nick Szabo tried to defend this huge inefficiency, citing that it is necessary for attestation/decentralized/etc, but that is a false dichotomy. There are many efficient ways to design blockchains and still provide decentralization and attestation guarantees. And the paper provides an exhaustive survey of blockchain design approaches.

In my summary below, I use many sentences (sometimes paragraphs) directly lifted off from the paper. The paper is easily accessible and well written, and worth reading if you like to learn about blockchains.

## Blockchain basics

Blockchain is a log of ordered blocks, each containing multiple transactions. In the database context, blockchain can be viewed as a solution to distributed transaction management: nodes keep replicas of the data and agree on an execution order of transactions. Distributed transaction processing systems often deal with concurrency control via 2-phase commit. However, in order to tolerate Byzantine behavior, the overhead of concurrency control becomes higher in blockchains.

Blockchains have many applications in business, such as security trading and settlement, asset and finance management banking, and insurance. Currently these applications are handled by systems built on MySQL and Oracle and by trusted companies/parties with whole bunch of lawyers/clerks/etc. Blockchain can disrupt this status quo because it reduces infrastructure and human costs. Blockchain's immutability, transparency, and inherent security help reduce human errors/malice and the need for manual intervention due to conflicting data. The paper says: "Goldman Sachs estimated 6 billion saving in current capital market, and J.P. Morgan forecast that blockchains will start to replace currently redundant infrastructure by 2020."

Oops, I forgot to mention Bitcoin. Bitcoin is also one of the applications of blockchains, namely a digital/crypto-currency application. Cryptocurrency is currently the most famous application of blockchains, but as the paper covers smartcontracts, you can see that those applications will dwarf anything we have seen so far.

## Private vs Public blockchains

In public blockchains, any node can join/leave the system, thus the blockchain is fully decentralized, resembling a peer-to-peer system. In private blockchains, the blockchain enforces strict membership via access-control and authentication.

Public blockchain: Bitcoin uses proof-of-work (PoW) for consensus: only a miner which has successfully solved a computationally hard puzzle (finding the right nonce for the block header) can append to the blockchain. It is still possible that two blocks are appended at the same time, creating a fork in the blockchain. Bitcoin resolves this by only considering a block as confirmed after it is followed by a number of blocks (typically six blocks).  PoW works well in the public settings because it guards against Sybil attacks, however, since it is non-deterministic and computationally expensive, it is unsuitable for applications such as banking and finance which must handle large volumes of transactions in a deterministic manner. (Bitcoin supports 7 transactions per second!)

Private blockchain: Since node identities are known in the private settings, most blockchains adopt one of the protocols from the vast literature on distributed consensus. Hyperledger v0.6.0 uses PBFT, a byzantine tolerant Paxos, while some other private chains even use plain Paxos. (Hyperledger  v1.0.0-rc1 adopts a no-Byzantine consensus protocol based on Kafka.)

However, even with PBFT (or XFT) scalability is a problem. Those Paxos protocols will not be able to scale to tens of nodes, let alone hundreds. An open research question is how can you have big-Paxos? (The federated consensus approach mentioned below is a promising approach to solve this problem.)

## What makes Blockchains tick?

The paper studies blockchain technology under four dimensions: distributed ledger, cryptography, consensus protocol and smartcontracts.

Distributed ledger: In blockchains, the ledger is replicated over all the nodes as an append-only data structure. A blockchain starts with some initial states, and the ledger records entire history of update operations made to the states. (This reminds me of Tango and particularly the followup vCorfu paper. The ideas explored there can be used for efficiently maintaining multiple streams in the same log so that reading the whole chain is not needed for materializing the updates at the nodes.)

Cryptography: This is used for ensuring the integrity of the ledgers, for making sure there is no tampering of the blockchain data. The global states are protected by a hash (Merkle) tree whose root hash is stored in a block.  The block history is also protected by linking the blocks through a chain of cryptographic hash pointers: the content of block number n+1 contains the hash of block number n. This way, any modification in block n immediately invalidates all the subsequent blocks.

Smart Contracts: A smart contract refers to the computation executed when a transaction is performed. At the trivial end of the spectrum, Bitcoin nodes implement a simple replicated state machine model which moves coins from one address to another. At the other extreme, Ethereum smart contracts can specify arbitrary computations and comes with its own virtual machine for executing Ethereum bytecodes. Similarly, Hyperledger employs Docker containers to execute its contracts written in any language. Smart contracts have many emerging business applications, but this makes software bugs very critical. In the DAO attack, the attacker stole \$50M worth of asset exploiting a bug.

## Consensus:

Since I am partial to distributed consensus, I am reserving a separate section to the use of consensus in blockchains.

Proof of work: All PoW protocols require miners to find solutions to cryptographic puzzles based on cryptographic hashes. How fast a block is created is dependent on how hard the puzzle is. The puzzle cannot be arbitrary east because it leads to unnecessary forks in the blockchain. Forks not only lead to wastage of resources but have security implication since they make it possible to double spend. Ethereum, another PoW-based blockchain, adopts GHOST protocol which helps bring down block generation time to tens of seconds without compromising much security. In GHOST, the blockchain is allowed to have branches as long as the branches do not contain conflicting transactions.

Proof of Stake: To alleviate huge cost of PoW, PoS maintains a single branch but changes the puzzle's difficulty to be inversely proportional to the miner's stake in the network. A stake is essentially a locked account with a certain balance representing the miner's commitment to keep the network healthy.

Paxos protocols: PoW suffers from non-finality, that is a block appended to a blockchain is not confirmed until it is extended by many other blocks.  PBFT protocol is deterministic. Implemented in the earlier version of Hyperledger (v0.6), the protocol ensures that once a block is appended, it is final and cannot be replaced or modified. It incurs O(N^2) network messages for each round of agreement where N is the number of nodes in the network. (This is because in contrast to classical Paxos, where every participant talks only with the leader, in PBFT,  every participant talks to every other participant as well.) The paper reports that PBFT scales poorly and collapses even before reaching the network limit. Zyzzyva optimizes for normal cases (when there are no failures) via speculative execution. XFT, assumes a network less hostile than purely Byzantine, and demonstrates better performance by reducing the number of network messages.

Federated: To overcome the scalability woes with PBFT, Ripple adopts an approach that partitions the network into smaller groups called federates. Each federate runs a local consensus protocol among its members, and the local agreements are then propagated to the entire network via nodes lying in the intersections of the federates. Ripple’s safety conditions are that there is a large majority of honest nodes in every federate, and that the intersection of any two federates contain at least one honest node. Ripple assumes federates are pre-defined and their safety conditions can be enforced by a network administrator. In a decentralized environment where node identities are unknown, such assumptions do not hold. Byzcoin and Elastico propose novel, two-phase protocols that combine PoW and PBFT. In the first phase, PoW is used to form a consensus group. Byzcoin implements this by having a sliding window over the blockchain and selecting the miners of the blocks within the window. Elastico groups nodes by their identities that change every epoch. In particular, a node identity is its solution to a cryptographic puzzle. In the second phase, the selected nodes perform PBFT to determine the next block. The end result is faster block confirmation time at a scale much greater than traditional PBFT (over 1000 nodes).

Nonbyzantine: The latest release of Hyperledger (v1.0) outsources the consensus component to Kafka. More specifically, transactions are sent to a centralized Kafka service which orders them into a stream of events. Every node subscribes to the same Kafka stream and therefore is notified of new transactions in the same order as they are published.

I love that Blockchains brings new constraints and requirements to the consensus problem. In Blockchains, the participants can now be Byzantine, motivated by financial gains. And it is not sufficient to limit the consensus participants to be 3 nodes or 5 nodes, which was enough for tolerating crashes and ensuring persistency of the data. In Blockchains, for reasons of attestability and tolerating colluding groups of Byzantine participants, it is preferred to keep the participants at 100s. Thus the problem becomes: How do you design byzantine tolerant consensus algorithm that scales to 100s or 1000s of participants?

I love when applications push us to invent new distributed algorithms.

## Blockbench and evaluation

The paper introduces a benchmarking framework, BLOCKBENCH, that is designed for understanding performance of private blockchains against data processing workloads. BLOCKBENCH is open source and contains YCSB and SmallBank data processing workloads as well as some popular Ethereum contracts.

The paper then presents a comprehensive evaluation of Ethereum, Parity and Hyperledger. The comparison against H-Store presented demonstrates that blockchains perform poorly at data processing tasks currently being handled by database systems. Although databases are designed without security and tolerance to Byzantine failures, the gap remains too big for blockchains to be disruptive to incumbent database systems.

The main findings are:

• Hyperledger performs consistently better than Ethereum and Parity across the benchmarks. But it fails to scale up to more than 16 nodes.
• Ethereum and Parity are more resilient to node failures, but they are vulnerable to security attacks that forks the blockchain.
• The main bottlenecks in Hyperledger and Ethereum are the consensus protocols, but for Parity the bottleneck is caused by transaction signing.
• Ethereum and Parity incur large overheads in terms of memory and disk usage. Their execution engine is also less efficient than that of Hyperledger.
• Hyperledger’s data model is low level, but its flexibility enables customized optimization for analytical queries.

## Wednesday, August 30, 2017

### Retrospective Lightweight Distributed Snapshots Using Loosely Synchronized Clocks

This is a summary of our recent work that appeared at ICDCS 2017. The tool we developed, Retroscope (available on GitHub), enables unplanned retrospective consistent global snapshots for distributed systems.

Many distributed systems would benefit from the ability to take unplanned snapshots of the past. Let's say Alice notices alarms going off for her distributed system deployment at 4:05pm. If she could roll-back to the state of the distributed system at 4:00pm, and roll forward step by step to figure out what caused the problems, she may be able to remedy the problem.

The ability to take retrospective snapshots requires each node to maintain a log of state changes and then to collate/align these logs to construct a consistent cut at a given time. However, clock uncertainty/skew among nodes is dangerous and can lead to taking an inconsistent snapshot. For example, the cut at 4:00pm in this figure using NTP is inconsistent, because event F is included in the cut, but causally preceding event E is not included.

To avoid this problem with physical clock synchronization, our snapshotting service Retroscope employs Hybrid Logical Clocks (HLC), that combine the benefits of physical time along with causality tracking of logical clocks (LC). By leveraging HLC, our system can take non-blocking unplanned/retrospective consistent snapshots of distributed systems with affinity to physical time.

## Hybrid Logical Clocks

Each HLC timestamp is a two-part tuple: the first part is a shadow-copy of NTP time, and the second part is an overflow buffer used when multiple events have identical first parts in their HLC timestamp. The first part of HLC at a node is maintained as the highest NTP time that the node is aware of. (This comes from the node's own physical clock, or comes from a remote node from which a message was received recently.) The second part acts as the logical clock for events with identical first parts, and it is being reset every time the first part is updated.

The figure shows HLC in operation, with HLC timestamps in green, and the machine’s physical time in red. Note how message exchange between P1 and P2 bumps P2’s first HLC component to be higher than its own physical clock.

HLC not only keeps track of physical time, but also provides the same guarantees as the LC. Namely, HLC satisfies the LC condition: if e hb f then HLC.e < HLC.f. These HLC properties allows us to identify consistent cuts the same way we identify them with LC: a cut is consistent when all events have the same timestamp. In situations where we have no event with desired timestamp, we can create a phantom, non-mutating event at the desired timestamp and use it to identify the consistent cut.

## Taking a Snapshot

An initiating agent can start the snapshot procedure by sending a snapshot request to every node. Each node, upon receiving the request performs a local snapshot of its current state, and uses the window-logs of state changes to undo the changes until the state reaches the requested HLC time. Each node performs snapshot independently, and there is no need for inter-node coordination on taking the snapshot. Once all machines have arrived to local snapshots at the same HLC time, we have obtained a global distributed snapshot (by virtue of the HLC feature stated above).

Our tool Retroscope also supports incremental snapshots to take an inexpensive snapshot in the vicinity of another snapshot by undoing (or redoing) events to reach the new one. These incremental snapshots are very useful for monitoring tasks, in which we need to explore many past system states in a step-through manner while searching for some invariant violation or fault causes.

We have implemented Retroscope as a set of libraries to be added to existing Java projects to enable retrospective snapshots capabilities. Our Retroscope library provides tools to log and keep track of state changes and HLC time of such changes independently at each node. The current implementation uses in-memory sliding-window log for state history, and have configurable capacity. The Retroscope server library provides the tools to aggregate multiple such event logs and identify consistent cuts across those logs with affinity to physical time. The API also allows querying for consistent cuts that satisfy certain predicates using a SQL-like querying language.

We will talk about Retroscope's querying and monitoring features in a later blog post.

## Evaluation

We have added Retroscope capabilities to several Java applications, such as Voldemort key-value database, Hazelcast in memory data-grid, and ZooKeeper.

Retroscoping Voldemort took less than 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on the Voldemort's storage. To quantify the overhead caused by Retroscope, we compared the performance of our modified Voldemort with unmodified version on a 10-node cluster. The figure below illustrates the throughput degradation resulted from adding Retroscope to the system. We observed that in the worst case, the degradation was around 10%, however in some cases we also observed no degradation at all. Latency overheads were similar to the throughput.

Taking a snapshot brings more stress to Voldemort, a disk-based system, as each node now needs to get a copy of it local state and undo changes from that copy. Retroscope allows for the entire snapshot routine to run in non-blocking manner, but with some performance degradation. The figure below illustrate the throughput and latency of the client requests on the 10-node cluster while performing a snapshot on a database of 1,000,000 items. Average throughput degradation of the system while taking a snapshot was 18%, although the performance can be improved by using a separate disk to make a database copy.

We also ran a similar experiment on Hazelcast in-memory datagrid, and observed very little performance degradation associated with the snapshot, since Hazelcast is in-memory system.

(This was a joint post with Aleksey Charapko.)

## Sunday, August 27, 2017

### Paper summary: Performance clarity as a first-class design principle (HOTOS'17)

This paper appeared in HOTOS'17 and is by Kay Ousterhout, Christopher Canel, Max Wolffe, Sylvia Ratnasamy, and Scott Shenker. (The team is at UC Berkeley Christopher is now at CMU.)

The paper argues that performance clarity is as important a design goal as performance or scalability. Performance clarity means ease of understanding where bottlenecks lie in a deployment and the performance implications of various system changes.

The motivation for giving priority to performance clarity arises from the performance opaqaness of distributed systems we have, and how hard it is to configure/tune them to optimize performance. In current distributed data processing systems, a small change in hardware or software configurations may cause disproportional impact on performance. An earlier paper, Ernest, showed that selecting an appropriate cloud instance type could improve performance by 1.9× without increasing cost. And when things run slowly in a deployment, it is hard to determine where the bottlenecks are.

This discussion on performance clarity reminds me of a Tony Hoare quote on software design:
There are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies, and the Other way is to make it so complicated that there are no obvious deficiencies. The first method is far more difficult.
This paper advocates to design the system so clear that if there are deficiencies in the deployment/configuration, they become obvious and can be accounted for.

Concretely, the paper proposes an architecture for data analytics frameworks (Spark in particular) in which jobs are decomposed into schedulable small units called "monotasks". Each monotask use exactly one of CPU, disk, and network.

This is an unconventional take. Today's frameworks break jobs into tasks that time-share the use of CPU, disk, and network. Concurrent tasks may contend for resources, even when their aggregate resource use does not exceed the capacity of the machine. In contrast, a single monotask has exclusive access to a resource. The benefit this provides is guaranteed resource isolation without any cross contention from another tasks.

Using monotasks to explicitly separate resources makes the bottleneck visible: the bottleneck is simply the resource (disk, CPU, or network) with the longest queue.

## Implementation

The hypothesis tested in the evaluation is that "using monotasks improves performance clarity without sacrificing high performance".

They give a prototype implementation of monotasks for Apache Spark, called MonoSpark, that  achieves performance parity with Spark for benchmark workloads. Their monotask implementation also yields a simple performance model that can predict the effects of future hardware and software changes.

So does monotasks hurt performance? I am quoting from the paper:

## Conclusions

The monotasks idea is very simple. But simple ideas have a chance to win. Performance predictability becomes a snap with this, because it is so easy to see/understand what will happen in a slightly different configuration. Monotasks provide very nice observability and resource isolation features.

I suspect this is somewhat Spark specific. It looks like this works best with predictable and CPU/disk/network balanced tasks/applications. These conditions are satisfied for Spark and Spark workloads.

Finally, the monotasks ideas will not work in current virtual machine, container, or cloud environments. Monotasks require direct access to machines to implement per disk, per CPU, and network schedulers.

### Dhalion: self-regulating stream processing in Heron (VLDB'17)

This paper appeared in VLDB'17, and is by Avrilia Floratou (Microsoft), Ashvin Agrawal (Microsoft), Bill Graham (Twitter), Sriram Rao (Microsoft), and Karthik Ramasamy (Streamlio).

Dhalion aims to further reduce the complexity of configuring and managing Heron streaming applications. It provides a self-tuning/self-correcting extension to Heron to monitor, diagnose, and correct problems with the deployment. (Dhalion is named after a mythical bird with interesting healing capabilities.)

It turns out there are only 4-5 category of things that can (is likely to) go wrong in a Heron deployment. (I had summarized Heron earlier here.) The detectors and diagnosers in Dhalion check for them. The resolver as a result tries one of these corresponding 4-5 correction actions. And the effectiveness of the correctors are then rated.

Yes, you heard it right, instead of guaranteed-to-work correctors, Dhalion rates the correctors' effectiveness and decides what to try next. "After every action is performed, Dhalion evaluates whether the action was able to resolve the problem or brought the system to a healthier state. If an action does not produce the expected outcome then it is blacklisted and it is not repeated again."

This is not a fool-proof method: what if the reason corrector is ineffective because another fault/condition started developing? Another problem I can see is interference among correctors causing problems. But I think Dhalion tries/evaluates correctors one by one, otherwise the interference issues obfuscates the evaluation of correctors.

So Dhalion takes a probabilistic approach to self-tuning and recovery. But maybe that should not be a cause for alarm.  The system is already faulty, why should the correctors be guaranteed to work? If the corrector framework can try/evaluate them quickly, and converge the system back, that is all that matters. (I am not sure if Dhalion can manage to do this fast enough for most scenarios.)

## Saturday, August 19, 2017

### Cloud fault-tolerance

I had submitted this paper to SSS'17. But it got rejected. So I made it a technical report and am sharing it here. While the paper is titled "Does the cloud need stabilizing?", it is relevant to the more general cloud fault-tolerance topic.

I think we wrote the paper clearly. Maybe too clearly.

Here is the link to our paperhttps://www.cse.buffalo.edu//tech-reports/2017-02.pdf
(Ideally I would have liked to expand on Section 4. I think that is what we will work on now.)

Below is an excerpt from the introduction of our paper, if you want to skim that before downloading the pdf.
-------------------------------------------------------------------------
The last decade has witnessed rapid proliferation of cloud computing. Internet-scale webservices have been developed providing search services over billions of webpages (such as Google and Bing), and providing social network applications to billions of users (such as Facebook and Twitter). While even the smallest distributed programs (with 3-5 actions) can produce many unanticipated error cases due to concurrency involved, it seems short of a miracle that these web-services are able to operate at those vast scales. These services have their share of occasional mishap and downtimes, but overall they hold up really well.

In this paper, we try to answer what factors contribute most to the high-availability of cloud computing services, what type of fault-tolerance and recovery mechanisms are employed by the cloud computing systems, and whether self-stabilization fits anywhere in that picture.
(Stabilization is a type of fault tolerance that advocates dealing with faults in a principled unified manner instead of on a case by case basis: Instead of trying to figure out how much faults can disrupt the system's operation, stabilization assumes arbitrary state corruption, which covers all possible worst-case collusions of faults and program actions. Stabilization then advocates designing recovery actions that takes the program back to invariant states starting from any arbitrary state.)

Self-stabilization had shown a lot of promise early on for being applicable in the cloud computing domain. The CAP theorem seemed to motivate the need for designing eventually-consistent systems for the cloud and self-stabilization has been pointed out by experts as a promising direction towards developing a cloud computing research agenda. On the other hand, there has not been many examples of stabilization in the clouds. For the last 6 years, the first author has been thinking about writing a "Stabilization in the Clouds" position paper, or even a survey when he thought there would surely be plenty of stabilizing design examples in the cloud. However, this proved to be a tricky undertaking. Discounting the design of eventually-consistent key-value stores and application of Conflict-free Replicated Data Types (CRDTs) for replication within key-value stores, the examples of self-stabilization in the cloud computing domain have been overwhelmingly trivial.

We ascribe the reason self-stabilization has not been prominent in the cloud  to our observation that cloud computing systems use infrastructure support to keep things simple and reduce the need for sophisticated design of fault-tolerance mechanisms. In particular, we identify the following cloud design principles to be the most important factors contributing to the high-availability of cloud services.

• Keep the services "stateless" to avoid state corruption. By leveraging on distributed stores for maintaining application data and on ZooKeeper for distributed coordination, the cloud computing systems keep the computing nodes almost stateless. Due to abundance of storage nodes, the key-value stores and databases replicate the data multiple times and achieves high-availability and fault-tolerance.
• Design loosely coupled distributed services where nodes are dispensable/substitutable. The service-oriented architecture, and the RESTful APIs for composing microservices are very prevalent design patterns for cloud computing systems, and they help facilitate the design of loosely-coupled distributed services. This minimizes the footprint and complexity of the global invariants maintained across nodes in the cloud computing systems. Finally, the virtual computing abstractions, such as virtual machines, containers, and lambda computing servers help make computing nodes easily restartable and substitutable for each other.
• Leverage on low level infrastructure and sharding when building applications. The low-level cloud computing infrastructure often contain more interesting/critical invariants and thus they are  designed by experienced engineers, tested rigorously, and sometimes even formally verified. Higher-level applications leverage on the low-level infrastructure, and avoid complicated invariants as they resort to sharding at the object-level and user-level. Sharding reduces the atomicity of updates, but this level of atomicity has been adequate for most webservices, such as social networks.

A common theme among these principles is that they keep the services simple, and trivially "stabilizing", in the informal sense of the term. Does this mean that self-stabilization research is unwarranted for cloud computing systems? To answer this, we point to some silver lining in the clouds for stabilization research. We notice a trend that even at the application-level, the distributed systems software starts to get more complicated/convoluted as services with more ambitious coordination needs are being build.

In particular, we explore the opportunity of applying self-stabilization to tame the complications that arise when composing multiple microservices to provide higher-level services. This is getting more common with the increased consumer demand for higher-level and more sophisticated web services. The higher-level services are in effect implementing distributed transactions over the federated microservices from multiple geodistributed vendors/parties, and that makes them prone to state unsynchronization and corruption due to  incomplete/failed requests at some microservices. At the data processing systems level, we also highlight a need for self-regulating and self-stabilizing design for realtime stream processing systems, as these systems get more ambitious and complicated as well.

Finally, we point out to a rift in the cloud computing fault model and recovery techniques, which motivates the need for more sophisticated recovery techniques. Traditionally the cloud computing model adopted the crash failure model, and managed to confine the faults within this model. In the cloud, it was feasible to use multiple nodes to redundantly store state, and easily substitute a stateless worker with another one as nodes are abundant and dispensable. However, recent surveys on the topic remark that more complex faults are starting to prevail in the clouds, and recovery techniques of restart, checkpoint-reset, and devops involved rollback and recovery are becoming inadequate.

## Friday, August 18, 2017

This summary combines material from "Twitter Heron: Stream Processing at Scale" which appeared at Sigmod 15 and "Twitter Heron: Towards extensible streaming engines" paper which appeared in ICDE'17.

Heron is Twitter's stream processing engine. It replaced Apache Storm use at Twitter, and all production topologies inside Twitter now run on Heron. Heron is API-compatible with Storm, which made it easy for Storm users to migrate to Heron. Reading the two papers, I got the sense that the reason Heron was developed is to improve on the debugability, scalability, and manageability of Storm. While a lot of importance is attributed to performance when comparing systems, these features (debugability, scalability, and manageability) are often more important in real-world use.

## The gripes with Storm

Hard to debug. In Storm, each worker can run disparate tasks. Since logs from multiple tasks are written into a single file, it is hard to identify any errors or exceptions that are associated with a particular task. Moreover, a single host may run multiple worker processes, but each of them could belong to different topologies. Thus, when a topology misbehaves (due to load or faulty code or hardware) it is hard to determine the root-causes for the performance degradation.

Wasteful for scheduling resources at the datacenter. Storm assumes that every worker is homogenous. This results in inefficient utilization of allocated resources, and often leads to overprovisioning to match the memory needs of the worker with the highest requirements at all the other workers.

Nimbus-related problems. The Storm Nimbus scheduler does not support resource reservation and isolation at a granular level for Storm workers. Moreover, the Nimbus component is a single point of failure.

ZooKeeper-related gripes. Storm uses Zookeeper extensively to manage heartbeats from the workers and the supervisors. This use of Zookeeper limits the number of workers per topology, and the total number of topologies in a cluster, since ZooKeeper becomes the bottleneck at higher numbers.

Lack of backpressure. In Storm, the sender does not adopt to the speed/capacity of the receiver, instead the messages are dropped if the receiver can't handle them.

## Heron's modular architecture

As in Storm, a Heron topology is a directed graph of spouts and bolts. (The spouts are sources of streaming input data, whereas the bolts perform computations on the streams they receive from spouts or other bolts.)

When a topology is submitted to Heron, the Resource Manager first determines how many containers should be allocated for the topology. The first container runs the Topology Master which is the process responsible for managing the topology throughout its existence. The remaining containers each run a Stream Manager, a Metrics Manager and a set of Heron Instances which are essentially spouts or bolts that run on their own JVM. The Stream Manager is the process responsible for routing tuples among Heron Instances. The Metrics Manager collects several metrics about the status of the processes in a container.

The Resource Manager then passes this allocation information to the Scheduler which is responsible for actually allocating the required resources from the underlying scheduling framework such as YARN or Aurora. The Scheduler is also responsible for starting all the Heron processes assigned to the container.

Heron is designed to be compositional, so it is possible to plug extensions into it and customize it. Heron allows the developer to create a new implementation for a specific Heron module (such as the scheduler, resource manager, etc) and plug it in the system without disrupting the remaining modules or the communication mechanisms between them. This modularity/compositionality is made a big deal, and the ICDE17 paper is dedicated entirely to this aspect of Heron design.

Note also that Heron's modular architecture also plays nice with the shared infrastructure at the datacenter. The provisioning of resources (e.g. for containers and even the Topology Master) is cleanly abstracted from the duties of the cluster manager.

## Heron Topology Architecture

Having a Topology Master per topology allows each topology to be managed independently of each other (and other systems in the underlying cluster). Also, failure of one topology (which can happen as user-defined code often gets run in the bolts) does not impact the other topologies.

The Stream Manager is another critical component of the system as it manages the routing of tuples among Heron Instances. Each Heron Instance connects to its local Stream Manager to send and receive tuples. All the Stream Managers in a topology connect between themselves to form n*n connections, where n is the number of containers of the topology. (I wonder if this quadratically growing number of connections would cause problems for very large n.)

As you can see in Figure 5, each Heron Instance is executing only a single task (e.g. running a spout or bolt), so it is easy to debug that instance by simply using tools like jstack and heap dump with that process. Since the metrics collection is granular, this makes it transparent as to which component of the topology is failing or slowing down. This small granularity design also provides resource isolation. Incidentally, this reminds me of the ideas argued in "Performance-clarity as a first-class design principle" paper.