yu y


1 DryadLINQ: A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language Yuan Yu Michael Isard Dennis Fetterly Mihai Budiu 1 Pradeep Kumar Gunda Jon Currey Úlfar Erlingsson 1 Microsoft Research Silicon Valley joint affiliation, Reykjavík University, Iceland expressive data model of strongly typed .NET objects. Abstract The main contribution of this paper is a set of language DryadLINQ is a system and a set of language extensions extensions and a corresponding system that can auto- that enable a new programming model for large scale dis- matically and transparently compile imperative programs tributed computing. It generalizes previous execution en- in a general-purpose language into distributed computa- vironments such as SQL, MapReduce, and Dryad in two tions that execute efficiently on large computing clusters. ways: by adopting an expressive data model of strongly Our goal is to give the programmer the illusion of typed .NET objects; and by supporting general-purpose writing for a single computer and to have the sys- imperative and declarative operations on datasets within tem deal with the complexities that arise from schedul- a traditional high-level programming language. ing, distribution, and fault-tolerance. Achieving this A DryadLINQ program is a sequential program com- goal requires a wide variety of components to inter- posed of LINQ expressions performing arbitrary side- act, including cluster-management software, distributed- effect-free transformations on datasets, and can be writ- execution middleware, language constructs, and devel- ten and debugged using standard .NET development opment tools. Traditional parallel databases (which tools. The DryadLINQ system automatically and trans- we survey in Section 6.1) as well as more recent parently translates the data-parallel portions of the pro- data-processing systems such as MapReduce [15] and gram into a distributed execution plan which is passed Dryad [26] demonstrate that it is possible to implement to the Dryad execution platform. Dryad, which has been high-performance large-scale execution engines at mod- in continuous operation for several years on production est financial cost, and clusters running such platforms clusters made up of thousands of computers, ensures ef- are proliferating. Even so, their programming interfaces ficient, reliable execution of this plan. all leave room for improvement. We therefore believe We describe the implementation of the DryadLINQ that the language issues addressed in this paper are cur- compiler and runtime. We evaluate DryadLINQ on a rently among the most pressing research areas for data- varied set of programs drawn from domains such as intensive computing, and our work on the DryadLINQ web-graph analysis, large-scale log mining, and machine system stems from this belief. learning. We show that excellent absolute performance DryadLINQ exploits LINQ (Language INtegrated 12 can be attained—a general-purpose sort of 10 Bytes of Query [2], a set of .NET constructs for programming data executes in 319 seconds on a 240-computer, 960- with datasets) to provide a powerful hybrid of declarative disk cluster—as well as demonstrating near-linear scal- and imperative programming. The system is designed to ing of execution time on representative applications as provide flexible and efficient distributed computation in we vary the number of computers used for a job. any LINQ-enabled programming language including C#, VB, and F#. Objects in DryadLINQ datasets can be of any .NET type, making it easy to compute with data such 1 Introduction as image patches, vectors, and matrices. DryadLINQ The DryadLINQ system is designed to make it easy for programs can use traditional structuring constructs such a wide variety of developers to compute effectively on as functions, modules, and libraries, and express iteration large amounts of data. DryadLINQ programs are written using standard loops. Crucially, the distributed execu- as imperative or declarative operations on datasets within tion layer employs a fully functional, declarative descrip- a traditional high-level programming language, using an tion of the data-parallel component of the computation, 8th USENIX Symposium on Operating Systems Design and Implementation 1 USENIX Association

2 which enables sophisticated rewritings and optimizations tualization is that it requires intermediate results to be like those traditionally employed by parallel databases. stored to persistent media, potentially increasing compu- tation latency. In contrast, parallel databases implement only declar- This paper makes the following contributions to the ative variants of SQL queries. There is by now a literature: widespread belief that SQL is too limited for many ap- plications [15, 26, 31, 34, 35]. One problem is that, in We have demonstrated a new hybrid of declarative • order to support database requirements such as in-place and imperative programming, suitable for large-scale updates and efficient transactions, SQL adopts a very re- data-parallel computing using a rich object-oriented strictive type system. In addition, the declarative “query- programming language. oriented” nature of SQL makes it difficult to express We have implemented the DryadLINQ system and • common programming patterns such as iteration [14]. validated the hypothesis that DryadLINQ programs can Together, these make SQL unsuitable for tasks such as be automatically optimized and efficiently executed on machine learning, content parsing, and web-graph anal- large clusters. ysis that increasingly must be run on very large datasets. The MapReduce system [15] adopted a radically sim- • We have designed a small set of operators that im- plified programming abstraction, however even common prove LINQ’s support for coarse-grain parallelization operations like database Join are tricky to implement in while preserving its programming model. this model. Moreover, it is necessary to embed MapRe- Section 2 provides a high-level overview of the steps in- duce computations in a scripting language in order to volved when a DryadLINQ program is run. Section 3 execute programs that require more than one reduction discusses LINQ and the extensions to its programming or sorting stage. Each MapReduce instantiation is self- model that comprise DryadLINQ along with simple il- contained and no automatic optimizations take place lustrative examples. Section 4 describes the DryadLINQ across their boundaries. In addition, the lack of any type- implementation and its interaction with the low-level system support or integration between the MapReduce Dryad primitives. In Section 5 we evaluate our system stages requires programmers to explicitly keep track of using several example applications at a variety of scales. objects passed between these stages, and may compli- Section 6 compares DryadLINQ to related work and Sec- cate long-term maintenance and re-use of software com- tion 7 discusses limitations of the system and lessons ponents. learned from its development. Several domain-specific languages have appeared on top of the MapReduce abstraction to hide some of this complexity from the programmer, including 2 System Architecture Sawzall [32], Pig [31], and other unpublished systems DryadLINQ compiles LINQ programs into distributed such as Facebook’s HIVE. These offer a limited hy- computations running on the Dryad cluster-computing bridization of declarative and imperative programs and infrastructure [26]. A Dryad job is a directed acyclic generalize SQL’s stored-procedure model. Some whole- graph where each vertex is a program and edges repre- query optimizations are automatically applied by these sent data channels. At run time, vertices are processes systems across MapReduce computation boundaries. communicating with each other through the channels, However, these approaches inherit many of SQL’s disad- and each channel is used to transport a finite sequence vantages, adopting simple custom type systems and pro- of data records. The data model and serialization are viding limited support for iterative computations. Their provided by higher-level software layers, in this case support for optimizations is less advanced than that in DryadLINQ. DryadLINQ, partly because the underlying MapReduce execution platform is much less flexible than Dryad. Figure 1 illustrates the Dryad system architecture. The execution of a Dryad job is orchestrated by a central- DryadLINQ and systems such as MapReduce are also ized “job manager.” The job manager is responsible distinguished from traditional databases [25] by having for: (1) instantiating a job’s dataflow graph; (2) schedul- expression plans. The planner allocates re- virtualized ing processes on cluster computers; (3) providing fault- sources independent of the actual cluster used for execu- tolerance by re-executing failed or slow processes; (4) tion. This means both that DryadLINQ can run plans monitoring the job and collecting statistics; and (5) trans- requiring many more steps than the instantaneously- forming the job graph dynamically according to user- available computation resources would permit, and that supplied policies. the computational resources can change dynamically, A cluster is typically controlled by a task scheduler, e.g. due to faults—in essence, we have an extra degree separate from Dryad, which manages a batch queue of of freedom in buffer management compared with tradi- jobs and executes a few at a time subject to cluster policy. tional schemes [21, 24, 27, 28, 29]. A downside of vir- 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 2

3 ation of code and static data for the remote Dryad ver- data plane job graph tices; and (c) the generation of serialization code for the Files, TCP, FIFO required data types. Section 4 describes these steps in detail. V V V DryadLINQ invokes a custom, DryadLINQ- Step 4. specific, Dryad job manager. The job manager may be executed behind a cluster firewall. PD PD PD NS Step 5. The job manager creates the job graph using the plan created in Step 3. It schedules and spawns the ver- tices as resources become available. See Figure 1. Job manager cluster control plane Figure 1: Dryad system architecture. NS is the name server which Step 6. Each Dryad vertex executes a vertex-specific maintains the cluster membership. The job manager is responsible for spawning vertices (V) on available computers with the help of a program (created in Step 3b). remote-execution and monitoring daemon (PD). Vertices exchange data Step 7. When the Dryad job completes successfully it through files, TCP pipes, or shared-memory channels. The grey shape indicates the vertices in the job that are currently running and the cor- writes the data to the output table(s). respondence with the job execution graph. Step 8. The job manager process terminates, and it re- turns control back to DryadLINQ. DryadLINQ creates 2.1 DryadLINQ Execution Overview objects encapsulating the out- DryadTable the local Figure 2 shows the flow of execution when a program is puts of the execution. These objects may be used as executed by DryadLINQ. inputs to subsequent expressions in the user program. output are fetched Data objects within a DryadTable to the local context only if explicitly dereferenced. Client machine Control returns to the user application. The it- Step 9. (1) .NET ToDryadTable foreach DryadTable erator interface over a allows the user to LINQ .NET (9) (2) read its contents as .NET objects. Expr Objects The application may generate subsequent Step 10. DryadLINQ DryadLINQ expressions, to be executed by a repetition Output (3) Compile of Steps 2–9. DryadTable Invoke 3 Programming with DryadLINQ Results (4) (5) Exec Vertex In this section we highlight some particularly useful and JM code plan (8) distinctive aspects of DryadLINQ. More details on the programming model may be found in LINQ language reference [2] and materials on the DryadLINQ project Dryad Input Output (7) website [1] including a language tutorial. A companion Execution tables Tables technical report [38] contains listings of some of the sam- Data center (6) ple programs described below. Figure 2: LINQ-expression execution in DryadLINQ. 3.1 LINQ Step 1. A .NET user application runs. It creates a The term LINQ [2] refers to a set of .NET constructs DryadLINQ expression object. Because of LINQ’s de- for manipulating sets and sequences of data items. We ferred evaluation (described in Section 3), the actual ex- describe it here as it applies to C# but DryadLINQ pro- ecution of the expression has not occurred. grams have been written in other .NET languages includ- The application calls Step 2. trigger- ToDryadTable ing F#. The power and extensibility of LINQ derive from ing a data-parallel execution. The expression object is a set of design choices that allow the programmer to ex- handed to DryadLINQ. press complex computations over datasets while giving the runtime great leeway to decide how these computa- DryadLINQ compiles the LINQ expression into Step 3. tions should be implemented. a distributed Dryad execution plan. It performs: (a) the IEnumer- The base type for a LINQ collection is decomposition of the expression into subexpressions, able . From a programmer’s perspective, this is each to be run in a separate Dryad vertex; (b) the gener- 8th USENIX Symposium on Operating Systems Design and Implementation 3 USENIX Association

4 that is ac- T an abstract dataset of objects of type // SQL-style syntax to join two input sets: cessed using an iterator interface. LINQ also defines // scoreTriples and staticRank the interface which is a subtype of IQueryable var adjustedScoreTriples = and represents an (unevaluated) ex- IEnumerable from d in scoreTriples pression constructed by combining LINQ datasets us- join r in staticRank on d.docID equals r.key ing LINQ operators. We need make only two obser- select new QueryScoreDocIDTriple(d, r); vations about these types: (a) in general the program- var rankedQueries = mer neither knows nor cares what concrete type imple- from s in adjustedScoreTriples ments any given dataset’s interface; and IEnumerable group s by s.query into g select TakeTopQueryResults(g); (b) DryadLINQ composes all LINQ expressions into objects and defers evaluation until the result IQueryable // Object-oriented syntax for the above join is needed, at which point the expression graph within the var adjustedScoreTriples = IQueryable is optimized and executed in its entirety on scoreTriples.Join(staticRank, IQueryable object can be used as an the cluster. Any d => d.docID, r => r.key, argument to multiple operators, allowing efficient re-use (d, r) => new QueryScoreDocIDTriple(d, r)); of common subexpressions. var groupedQueries = LINQ expressions are statically strongly typed adjustedScoreTriples.GroupBy(s => s.query); through use of nested generics, although the compiler var rankedQueries = hides much of the type-complexity from the user by pro- groupedQueries.Select( g => TakeTopQueryResults(g)); viding a range of “syntactic sugar.” Figure 3 illustrates LINQ’s syntax with a fragment of a simple example pro- gram that computes the top-ranked results for each query Figure 3: A program fragment illustrating two ways of expressing the same operation. The first uses LINQ’s declarative syntax, and the sec- in a stored corpus. Two versions of the same LINQ ex- r=>r.key ond uses object-oriented interfaces. Statements such as pression are shown, one using a declarative SQL-like use C#’s syntax for anonymous lambda expressions. syntax, and the second using the object-oriented style we .NET objects Partition adopt for more complex programs. The program first performs a Join to “look up” the scoreTriples static rank of each document contained in a tuple and then computes a new rank for that tuple, com- bining the query-dependent score with the static score in- side the constructor for QueryScoreDocIDTriple . The program next groups the resulting tuples by query, and Collection outputs the top-ranked results for each query. The full Figure 4: The DryadLINQ data model: strongly-typed collections of example program is included in [38]. .NET objects partitioned on a set of computers. The second, object-oriented, version of the example illustrates LINQ’s use of C#’s lambda expressions. The tain arbitrary .NET types, but each DryadLINQ dataset is Join method, for example, takes as arguments a dataset in general distributed across the computers of a cluster, ) and to perform the Join against (in this case staticRank partitioned into disjoint pieces as shown in Figure 4. The three functions. The first two functions describe how to partitioning strategies used—hash-partitioning, range- determine the keys that should be used in the Join. The partitioning, and round-robin—are familiar from paral- third function describes the Join function itself. Note that lel databases [18]. This dataset partitioning is managed the compiler performs static type inference to determine transparently by the system unless the programmer ex- the concrete types of objects and anonymous lambda var plicitly overrides the optimizer’s choices. expressions so the programmer need not remember (or The inputs and outputs of a DryadLINQ computation even know) the type signatures of many subexpressions are represented by objects of type , DryadTable or helper functions. which is a subtype of IQueryable . Subtypes of support underlying storage providers DryadTable that include distributed filesystems, collections of NTFS 3.2 DryadLINQ Constructs files, and sets of SQL tables. objects may DryadTable DryadLINQ preserves the LINQ programming model include metadata read from the file system describing ta- and extends it to data-parallel programming by defining ble properties such as schemas for the data items con- a small set of new operators and datatypes. tained in the table, and partitioning schemes which the The DryadLINQ data model is a distributed imple- DryadLINQ optimizer can use to generate more efficient mentation of LINQ collections. Datasets may still con- executions. These optimizations, along with issues such 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 4

5 If the DryadLINQ system has no further information as data serialization and compression, are discussed in f ) will cause all of the compu- about Fork , Apply (or Section 4. tation to be serialized onto a single computer. More The primary restriction imposed by the DryadLINQ often, however, the user supplies annotations on that f system to allow distributed execution is that all the func- can be paral- Apply indicate conditions under which tions called in DryadLINQ expressions must be side- lelized. The details are too complex to be described in . Shared objects can be referenced and read effect free the space available, but quite general “conditional homo- freely and will be automatically serialized and distributed morphism” is supported—this means that the application where necessary. However, if any shared object is can specify conditions on the partitioning of a dataset modified, the result of the computation is undefined. under which Apply can be run independently on each DryadLINQ does not currently check or enforce the ab- partition. DryadLINQ will automatically re-partition the sence of side-effects. data to match the conditions if necessary. The inputs and outputs of a DryadLINQ compu- DryadLINQ allows programmers to specify annota- tation are specified using the GetTable and tions of various kinds. These provide manual hints to ToDryadTable operators, e.g.: guide optimizations that the system is unable to perform var input = GetTable("file://in.tbl"); automatically, while preserving the semantics of the pro- var result = MainProgram(input, ...); operator makes Apply gram. As mentioned above, the var output = ToDryadTable(result, "file://out.tbl"); use of annotations, supplied as simple .NET attributes, to indicate opportunities for parallelization. There are also Tables are referenced by URI strings that indicate the annotations to discriminate functions that re- Resource storage system to use as well as the name of the parti- quire constant storage from those whose storage grows ToDryadTable can simulta- tioned dataset. Variants of along with the input collection size—these are used by neously invoke multiple expressions and generate mul- the optimizer to determine buffering strategies and de- tiple output DryadTable s in a single distributed Dryad cide when to pipeline operators in the same process. The job. This feature (also encountered in parallel databases programmer may also declare that a dataset has a partic- such as Teradata) can be used to avoid recomputing or ular partitioning scheme if the file system does not store materializing common subexpressions. sufficient metadata to determine this automatically. DryadLINQ offers two data re-partitioning operators: The DryadLINQ optimizer produces good automatic and RangePartition . HashPartition execution plans for most programs composed of standard These operators are needed to enforce a partitioning on LINQ operators, and annotations are seldom needed un- an output dataset and they may also be used to over- statements. Apply less an application uses complex ride the optimizer’s choice of execution plan. From a LINQ perspective, however, they are no-ops since they just reorganize a collection without changing its con- 3.3 Building on DryadLINQ tents. The system allows the implementation of addi- tional re-partitioning operators, but we have found these Many programs can be directly written using the two to be sufficient for a wide class of applications. DryadLINQ primitives. Nevertheless, we have begun to and Fork , Apply The remaining new operators are build libraries of common subroutines for various appli- which can be thought of as an “escape-hatch” that a pro- cation domains. The ease of defining and maintaining grammer can use when a computation is needed that can- such libraries using C#’s functions and interfaces high- not be expressed using any of LINQ’s built-in opera- lights the advantages of embedding data-parallel con- tors. and passes to it an iter- f takes a function Apply structs within a high-level language. ator over the entire input collection, allowing arbitrary The MapReduce programming model from [15] can Apply streaming computations. As a simple example, be compactly stated as follows (eliding the precise type can be used to perform “windowed” computations on a signatures for clarity): th entry of the output sequence is i sequence, where the public static MapReduce( // returns set of Rs + d ] for a a function on the range of input values [ i,i source, // set of Ts fixed window of length d are Apply . The applications of → Ms mapper, // function from T much more general than this and we discuss them fur- → keySelector, // function from M K operator is very similar to ther in Section 7. The For k Rs → reducer // function from (K,Ms) except that it takes a single input and generates Apply ){ multiple output datasets. This is useful as a performance var mapped = source.SelectMany(mapper); optimization to eliminate common subcomputations, e.g. var groups = mapped.GroupBy(keySelector); to implement a document parser that outputs both plain return groups.SelectMany(reducer); } text and a bibliographic entry to separate tables. 8th USENIX Symposium on Operating Systems Design and Implementation 5 USENIX Association

6 Section 4 discusses the execution plan that is auto- nodes to choose an appro- OrderBy used by subsequent matically generated for such a computation by the priate distributed sort algorithm as described below in DryadLINQ optimizer. Section 4.2.3. The properties are seeded from the LINQ expression tree and the input and output tables’ metadata, We built a general-purpose library for manipulating and propagated and updated during EPG rewriting. numerical data to use as a platform for implementing machine-learning algorithms, some of which are de- Propagating these properties is substantially harder in scribed in Section 5. The applications are written as the context of DryadLINQ than for a traditional database. traditional programs calling into library functions, and The difficulties stem from the much richer data model make no explicit reference to the distributed nature of and expression language. Consider one of the simplest the computation. Several of these algorithms need to it- is a simple operations: input.Select(x => f(x)) . If f erate over a data transformation until convergence. In a expression, e.g. x.name , then it is straightforward for traditional database this would require support for recur- DryadLINQ to determine which properties can be prop- sive expressions, which are tricky to implement [14]; in agated. However, for arbitrary f it is in general impos- DryadLINQ it is trivial to use a C# loop to express the sible to determine whether this transformation preserves iteration. The companion technical report [38] contains the partitioning properties of the input. annotated source for some of these algorithms. Fortunately, DryadLINQ can usually infer properties in the programs typical users write. Partition and sort key properties are stored as expressions, and it is often fea- 4 System Implementation sible to compare these for equality using a combination of static typing, static analysis, and reflection. The sys- This section describes the DryadLINQ parallelizing tem also provides a simple mechanism that allows users compiler. We focus on the generation, optimization, and to assert properties of an expression when they cannot be execution of the distributed execution plan, correspond- determined automatically. ing to step 3 in Figure 2. The DryadLINQ optimizer is similar in many respects to classical database optimiz- ers [25]. It has a static component, which generates an 4.2 DryadLINQ Optimizations execution plan, and a dynamic component, which uses Dryad policy plug-ins to optimize the graph at run time. DryadLINQ performs both static and dynamic optimiza- tions. The static optimizations are currently greedy heuristics, although in the future we may implement 4.1 Execution Plan Graph cost-based optimizations as used in traditional databases. When it receives control, DryadLINQ starts by convert- The dynamic optimizations are applied during Dryad job ing the raw LINQ expression into an execution plan execution, and consist in rewriting the job graph depend- graph (EPG), where each node is an operator and edges ing on run-time data statistics. Our optimizations are represent its inputs and outputs. The EPG is closely re- sound in that a failure to compute properties simply re- lated to a traditional database query plan, but we use sults in an inefficient, though correct, execution plan. the more general terminology of execution plan to en- compass computations that are not easily formulated as “queries.” The EPG is a directed acyclic graph—the 4.2.1 Static Optimizations existence of common subexpressions and operators like DryadLINQ’s static optimizations are conditional graph means that EPGs cannot always be described by Fo r k rewriting rules triggered by a predicate on EPG node trees. DryadLINQ then applies term-rewriting optimiza- properties. Most of the static optimizations are focused tions on the EPG. The EPG is a “skeleton” of the Dryad on minimizing disk and network I/O. The most important data-flow graph that will be executed, and each EPG are: node is replicated at run time to generate a Dryad “stage” (a collection of vertices running the same computation Multiple operators may be executed in a Pipelining: on different partitions of a dataset). The optimizer an- single process. The pipelined processes are themselves notates the EPG with metadata properties. For edges, LINQ expressions and can be executed by an existing these include the .NET type of the data and the compres- single-computer LINQ implementation. sion scheme, if any, used after serialization. For nodes, DryadLINQ removes unnec- Removing redundancy: they include details of the partitioning scheme used, and essary hash- or range-partitioning steps. ordering information within each partition. The output of a node, for example, might be a dataset that is hash- Since re-partitioning datasets is Eager Aggregation: partitioned by a particular key, and sorted according to expensive, down-stream aggregations are moved in that key within each partition; this information can be front of partitioning operators where possible. 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 6

7 I/O reduction: Where possible, DryadLINQ uses DS DS DS DS DS Dryad’s TCP-pipe and in-memory FIFO channels in- stead of persisting temporary data to files. The system H H H by default compresses data before performing a parti- tioning, to reduce network traffic. Users can manually O override compression settings to balance CPU usage D D D D D (1) (3) (2) with network load if the optimizer makes a poor de- cision. M M M M M S S S S S 4.2.2 Dynamic Optimizations DryadLINQ makes use of hooks in the Dryad API to Distributed sort optimization described in Section 4.2.3. Figure 5: Transformation (1) is static, while (2) and (3) are dynamic. dynamically mutate the execution graph as information from the running job becomes available. Aggregation map SM SM SM SM SM SM SM gives a major opportunity for I/O reduction since it can S S S S S S S sort be optimized into a tree according to locality, aggregat- groupby G G G G G G G map ing data first at the computer level, next at the rack level, R R R R R R R reduce SM and finally at the cluster level. The topology of such an distribute D D D D D D D G aggregation tree can only be computed at run time, since (2) R (1) (3) mergesort MS MS MS MS MS it is dependent on the dynamic scheduling decisions groupby G G X G G G which allocate vertices to computers. DryadLINQ au- reduce R R R R R tomatically uses the dynamic-aggregation logic present partial aggregation X X X mergesort in Dryad [26]. MS MS groupby G G Dynamic data partitioning sets the number of ver- reduce R R reduce tices in each stage (i.e., the number of partitions of each consumer X X dataset) at run time based on the size of its input data. Traditional databases usually estimate dataset sizes stat- Figure 6: Execution plan for MapReduce, described in Section 4.2.4. ically, but these estimates can be very inaccurate, for ex- Step (1) is static, (2) and (3) are dynamic based on the volume and location of the data in the inputs. ample in the presence of correlated queries. DryadLINQ supports dynamic hash and range partitions—for range based on the number of partitions in the preceding com- partitions both the number of partitions and the partition- putation, and the number of partitions in the M+S stage ing key ranges are determined at run time by sampling is chosen based on the volume of data to be sorted (tran- the input dataset. sitions (2) and (3) in Figure 5). OrderBy 4.2.3 Optimizations for 4.2.4 Execution Plan for MapReduce illustrates d DryadLINQ’s logic for sorting a dataset many of the static and dynamic optimizations available This section analyzes the execution plan generated by to the system. Different strategies are adopted depending DryadLINQ for the MapReduce computation from Sec- on d ’s initial partitioning and ordering. Figure 5 shows tion 3.3. Here, we examine only the case when the input O in the most com- the evolution of an OrderBy node is not ordered and the reduce function is GroupBy to is not already range-partitioned by plex case, where d determined to be associative and commutative. The auto- the correct sort key, nor are its partitions individually or- matically generated execution plan is shown in Figure 6. dered by the key. First, the dataset must be re-partitioned. The plan is statically transformed (1) into a Map and a stage performs deterministic sampling of the in- DS The Reduce stage. The Map stage applies the SelectMany put dataset. The samples are aggregated by a histogram SM operator ( S ) and then sorts each partition ( ), performs , which determines the partition keys as a func- H vertex a local GroupBy ( G ) and finally a local reduction ( ). R tion of data distribution (load-balancing the computation The nodes perform a hash-partition. All these opera- D vertices perform the actual re- D in the next stage). The tions are pipelined together in a single process. The Re- . partitioning, based on the key ranges computed by H duce stage first merge-sorts all the incoming data streams Next, a merge node M interleaves the inputs, and a S ( G ) and the ( MS ). This is followed by another GroupBy S are pipelined in a single pro- node sorts them. M and R ). All these Reduce stage operators are final reduction ( cess, and communicate using iterators. The number of pipelined in a single process along with the subsequent DS+H+D partitions in the stage is chosen at run time operation in the computation ( ). As with the sort plan X 8th USENIX Symposium on Operating Systems Design and Implementation 7 USENIX Association

8 in Section 4.2.3, at run time (2) the number of Map in- 4.4 Leveraging Other LINQ Providers stances is automatically determined using the number of One of the greatest benefits of using the LINQ frame- input partitions, and the number of Reduce instances is work is that DryadLINQ can leverage other systems that chosen based on the total volume of data to be Reduced. use the same constructs. DryadLINQ currently gains If necessary, DryadLINQ will insert a dynamic aggrega- most from the use of PLINQ [19], which allows us to run tion tree (3) to reduce the amount of data that crosses the the code within each vertex in parallel on a multi-core network. In the figure, for example, the two rightmost in- server. PLINQ, like DryadLINQ, attempts to make the put partitions were processed on the same computer, and process of parallelizing a LINQ program as transparent their outputs have been pre-aggregated locally before be- as possible, though the systems’ implementation strate- ing transferred across the network and combined with the gies are quite different. Space does not permit a detailed output of the leftmost partition. explanation, but PLINQ employs the iterator model [25] The resulting execution plan is very similar to since it is better suited to fine-grain concurrency in a the manually constructed plans reported for Google’s shared-memory multi-processor system. Because both MapReduce [15] and the Dryad histogram computation PLINQ and DryadLINQ use expressions composed from in [26]. The crucial point to note is that in DryadLINQ the same LINQ constructs, it is straightforward to com- MapReduce is not a primitive, hard-wired operation, and bine their functionality. DryadLINQ’s vertices execute all user-specified computations gain the benefits of these LINQ expressions, and in general the addition by the powerful automatic optimization strategies. DryadLINQ code generator of a single line to the vertex’s program triggers the use of PLINQ, allowing the vertex 4.3 Code Generation to exploit all the cores in a cluster computer. We note that this remarkable fact stems directly from the careful The EPG is used to derive the Dryad execution plan af- design choices that underpin LINQ. ter the static optimization phase. While the EPG encodes We have also added interoperation with the LINQ-to- all the required information, it is not a runnable program. SQL system which lets DryadLINQ vertices directly ac- DryadLINQ uses dynamic code generation to automati- cess data stored in SQL databases. Running a database cally synthesize LINQ code to be run at the Dryad ver- on each cluster computer and storing tables partitioned tices. The generated code is compiled into a .NET as- across these databases may be much more efficient than sembly that is shipped to cluster computers at execution using flat disk files for some applications. DryadLINQ time. For each execution-plan stage, the assembly con- programs can use “partitioned” SQL tables as input tains two pieces of code: and output. DryadLINQ also identifies and ships some (1) The code for the LINQ subexpression executed by subexpressions to the SQL databases for more efficient each node. execution. Finally, the default single-computer LINQ-to-Objects (2) Serialization code for the channel data. This code is implementation allows us to run DryadLINQ programs much more efficient than the standard .NET serializa- on a single computer for testing on small inputs under the tion methods since it can rely on the contract between control of the Visual Studio debugger before executing the reader and writer of a channel to access the same on a full cluster dataset. statically known datatype. The subexpression in a vertex is built from pieces of the overall EPG passed in to DryadLINQ. The EPG is 4.5 Debugging created in the original client computer’s execution con- Debugging a distributed application is a notoriously dif- text, and may depend on this context in two ways: ficult problem. Most DryadLINQ jobs are long running, The expression may reference variables in the lo- (1) processing massive datasets on large clusters, which cal context. These references are eliminated by par- could make the debugging process even more challeng- tial evaluation of the subexpression at code-generation ing. Perhaps surprisingly, we have not found debug- time. For primitive values, the references in the expres- ging the correctness of programs to be a major chal- sions are replaced with the actual values. Object values lenge when using DryadLINQ. Several users have com- are serialized to a resource file which is shipped to com- mented that LINQ’s strong typing and narrow interface puters in the cluster at execution time. have turned up many bugs before a program is even exe- (2) The expression may reference .NET libraries. .NET cuted. Also, as mentioned in Section 4.4, DryadLINQ reflection is used to find the transitive closure of all non- supports a straightforward mechanism to run applica- system libraries referenced by the executable, and these tions on a single computer, with very sophisticated sup- are shipped to the cluster computers at execution time. port from the .NET development environment. 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 8

9 Once an application is running on the cluster, an in- This gave each local switch up to 6 GBits per second of dividual vertex may fail due to unusual input data that full duplex connectivity. Note that the switches are com- manifests problems not apparent from a single-computer modity parts purchased for under $1000 each. test. A consequence of Dryad’s deterministic-replay ex- ecution model, however, is that it is straightforward to 5.2 Terasort re-execute such a vertex in isolation with the inputs that caused the failure, and the system includes scripts to ship In this experiment we evaluate DryadLINQ using the the vertex executable, along with the problematic parti- Terasort benchmark [3]. The task is to sort 10 billion tions, to a local computer for analysis and debugging. 100-Byte records using case-insensitive string compar- Performance debugging is a much more challenging ison on a 10-Byte key. We use the data generator de- problem in DryadLINQ today. Programs report sum- scribed in [3]. The DryadLINQ program simply defines mary information about their overall progress, but if par- the record type, creates a DryadTable for the partitioned ticular stages of the computation run more slowly than inputs, and calls OrderBy ; the system then automati- expected, or their running time shows surprisingly high cally generates an execution plan using dynamic range- variance, it is necessary to investigate a collection of dis- partitioning as described in Section 4.2.3 (though for the parate logs to diagnose the issue manually. The central- purposes of running a controlled experiment we manu- ized nature of the Dryad job manager makes it straight- ally set the number of partitions for the sorting stage). forward to collect profiling information to ease this task, For this experiment, each computer in the clus- and simplifying the analysis of these logs is an active ter stored a partition of size around 3 . GBytes 87 area of our current research. 600 , 666 , 166 , Bytes). We varied the number of com- ( 4 computers, the n puters used, so for an execution using . 3 total data sorted is 87 n GBytes. On the largest run 5 Experimental Evaluation 12 Bytes of data. The most time- 10 =240 n and we sort consuming phase of this experiment is the network read We have evaluated DryadLINQ on a set of applica- to range-partition the data. However, this is overlapped tions drawn from domains including web-graph analy- with the sort, which processes inputs in batches in paral- sis, large-scale log mining, and machine learning. All of lel and generates the output by merge-sorting the sorted our performance results are reported for a medium-sized batches. DryadLINQ automatically compresses the data private cluster described in Section 5.1. Dryad has been before it is transferred across the network—when sorting in continuous operation for several years on production 12 10 GBytes of compressed data were Bytes of data, 150 clusters made up of thousands of computers so we are transferred across the network. confident in the scaling properties of the underlying ex- Table 1 shows the elapsed times in seconds as the num- ecution engine, and we have run large-scale DryadLINQ ber of machines increases from 1 to 240, and thus the programs on those production clusters. 12 Bytes. 10 data sorted increases from 3.87 GBytes to On repeated runs the times were consistent to within 5% 5.1 Hardware Configuration of their averages. Figure 7 shows the same information in graphical form. For the case of a single partition, The experiments described in this paper were run on a DryadLINQ uses a very different execution plan, skip- cluster of 240 computers. Each of these computers was ping the sampling and partitioning stages. It thus reads running the Windows Server 2003 64-bit operating sys- the input data only once, and does not perform any net- tem. The computers’ principal components were two work transfers. The single-partition time is therefore the dual-core AMD Opteron 2218 HE CPUs with a clock baseline time for reading a partition, sorting it, and writ- speed of 2.6 GHz, 16 GBytes of DDR2 random access 2 ing the output. For all computers were 20 ≤ n ≤ memory, and four 750 GByte SATA hard drives. The connected to the same local switch, and the elapsed time computers had two partitions on each disk. The first, stays fairly constant. When 20 n> the elapsed time small, partition was occupied by the operating system on seems to be approaching an asymptote as we increase the one disk and left empty on the remaining disks. The re- number of computers. We interpret this to mean that the maining partitions on each drive were striped together to cluster is well-provisioned: we do not saturate the core form a large data volume spanning all four disks. The computers were each connected to a Linksys SRW2048 240 Computers 1 2 10 20 40 80 48-port full-crossbar GBit Ethernet local switch via GBit Time 119 241 242 245 271 294 319 Ethernet. There were between 29 and 31 computers con- nected to each local switch. Each local switch was in Table 1: Time in seconds to sort different amounts of data. The total turn connected to a central Linksys SRW2048 switch, data sorted by an n -machine experiment is around 3 . 87 n GBytes, or 12 n =240 Bytes when . 10 via 6 ports aggregated using 802.3ad link aggregation. 8th USENIX Symposium on Operating Systems Design and Implementation 9 USENIX Association

10 350 25.00 - pass Dryad Two DryadLINQ 300 20.00 250 15.00 200 up - 150 Speed 10.00 100 Execution time (in seconds) 5.00 50 0 0.00 0 50 100 150 200 250 25 40 45 30 20 15 35 10 5 0 Number of computers Number of computers Figure 7: Sorting increasing amounts of data while keeping the volume The speed-up of the Skyserver Q18 computation as the num- Figure 8: of data per computer fixed. The total data sorted by an -machine n ber of computers is varied. The baseline is relative to DryadLINQ job 12 . =240 n Bytes when 3 n GBytes, or 10 experiment is around . 87 running on a single computer and times are given in Table 2. 20 Computers 1 5 10 40 to a hand-tuned sort strategy used by the Dryad program, 2167 Dryad 92 135 242 451 which is somewhat faster than DryadLINQ’s automatic DryadLINQ 176 328 580 2666 113 parallel sort implementation. However, the DryadLINQ program is written at a much higher level. It abstracts Table 2: Time in seconds to process skyserver Q18 using different num- much of the distributed nature of the computation from ber of computers. the programmer, and is only 10% of the length of the native code. network even when performing a dataset repartitioning Figure 8 graphs the inverse of the running times, nor- across all computers in the cluster. malized to show the speed-up factor relative to the two- 20 all pass single-computer Dryad version. For ≤ n computers were connected to the same local switch, and 5.3SkyServer the speedup factor is approximately proportional to the number of computers used. When = 40 the comput- n For this experiment we implemented the most time- ers must communicate through the core switch and the consuming query (Q18) from the Sloan Digital Sky Sur- scaling becomes sublinear. vey database [23]. The query identifies a “gravitational lens” effect by comparing the locations and colors of stars in a large astronomical table, using a three-way 5.4PageRank Join over two input tables containing 11.8 GBytes and 41.8 GBytes of data, respectively. In this experiment, We also evaluate the performance of DryadLINQ at per- we compare the performance of the two-pass variant forming PageRank calculations on a large web graph. of the Dryad program described in [26] with that of PageRank is a conceptually simple iterative computation DryadLINQ. The Dryad program is around 1000 lines of for scoring hyperlinked pages. Each page starts with C++ code whereas the corresponding DryadLINQ pro- a real-valued score. At each iteration every page dis- gram is only around 100 lines of C#. The input tables tributes its score across its outgoing links and updates its were manually range-partitioned into 40 partitions using score to the sum of values received from pages linking the same keys. We varied , the number of comput- n to it. Each iteration of PageRank is a fairly simple rela- ers used, to investigate the scaling performance. For a tional query. We first Join the set of links with the set of we ensured that the tables were distributed such n given ranks, using the source as the key. This results in a set of 40 partitions of /n that each computer had approximately scores, one for each link, that we can accumulate using each, and that for a given partition key-range the data a GroupBy-Sum with the link’s destinations as keys. We from the two tables was stored on the same computer. compare two implementations: an initial “naive” attempt and an optimized version. Table 2 shows the elapsed times in seconds for the na- be- n tive Dryad and DryadLINQ programs as we varied Our first DryadLINQ implementation follows the out- tween 1 and 40. On repeated runs the times were consis- line above, except that the links are already grouped by tent to within 3.5% of their averages. The DryadLINQ source (this is how the crawler retrieves them). This implementation is around 1.3 times slower than the na- makes the Join less complicated—once per page rather tive Dryad job. We believe the slowdown is mainly due than once per link—but requires that we follow it with 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 10

11 them into 3 clusters. The algorithm was written using the a SelectMany, to produce the list of scores to aggregate. machine-learning framework described in Section 3.3 in This naive implementation takes 93 lines of code, includ- 160 lines of C#. The computation has three stages: (1) ing 35 lines to define the input types. parsing and re-partitioning the data across all the com- The naive approach scales well, but is inefficient be- puters in the cluster; (2) counting the records; and (3) cause it must reshuffle data proportional to the number performing an iterative E–M computation. We always of links to aggregate the transmitted scores. We improve perform 10 iterations (ignoring the convergence crite- on it by first HashPartitioning the link data by a hash of rion) grouped into two blocks of 5 iterations, materializ- the hostname of the source, rather than a hash of the page ing the results every 5 iterations. Some stages are CPU- name. The result is that most of the rank updates are bound (performing matrix algebra), while other are I/O written back locally—80%-90% of web links are host- bound. The job spawns about 10,000 processes across local—and do not touch the network. It is also possible the 240 computers, and completes end-to-end in 7 min- to cull leaf pages from the graph (and links to them); utes and 11 seconds, using about 5 hours of effective they do not contribute to the iterative computation, and CPU time. needn’t be kept in the inner loop. Further performance We also used DryadLINQ to apply statistical inference optimizations, like pre-grouping the web pages by host algorithms [33] to automatically discover network-wide (+7 LOC), rewriting each of these host groups with dense relationships between hosts and services on a medium- local names (+21 LOC), and pre-aggregating the ranks size network (514 hosts). For each network host the from each host (+18 LOC) simplify the computation fur- algorithms compose a dependency graph by analyzing ther and ease the memory footprint. The complete source timings between input/output packets. The input is pro- code for our implementation of PageRank is contained in cessed header data from a trace of 11 billion packets the companion technical report [38]. (180 GBytes packed using a custom compression format We evaluate both of these implementations (running into 40 GBytes). The main body of this DryadLINQ on 240 computers) on a large web graph containing program is just seven lines of code. It hash-partitions 954M pages and 16.5B links, occupying 1.2 TB com- the data using the pair as a key, applies (host,hour) pressed on disk. The naive implementation, including a doubly-nested E–M algorithm and hypothesis testing pre-aggregation, executes 10 iterations in 12,792 sec- (which takes 95% of the running time), partitions again onds. The optimized version, which further compresses by hour, and finally builds graphs for all 174,588 active the graph down to 116 GBytes, executes 10 iterations in host hours. The computation takes 4 hours and 22 min- 690 seconds. utes, and more than 10 days of effective CPU time. It is natural to compare our PageRank implementa- tion with similar implementations using other platforms. MapReduce, Hadoop, and Pig all use the MapReduce 6 Related Work computational framework, which has trouble efficiently implementing Join due to its requirement that all input DryadLINQ builds on decades of previous work in dis- (including the web graph itself) be output of the previous tributed computation. The most obvious connections are stage. By comparison, DryadLINQ can partition the web with parallel databases, grid and cluster computing, par- graph once, and reuse that graph in multiple stages with- allel and high-performance computation, and declarative out moving any data across the network. It is important programming languages. to note that the Pig language masks the complexity of Many of the salient features of DryadLINQ stem from Joins, but they are still executed as MapReduce compu- the high-level system architecture. In our model of clus- tations, thus incurring the cost of additional data move- ter computing the three layers of storage, execution, ment. SQL-style queries can permit Joins, but suffer and application are decoupled. The system can make from their rigid data types, preventing the pre-grouping use of a variety of storage layers, from raw disk files of links by host and even by page. to distributed filesystems and even structured databases. The Dryad distributed execution environment provides generic distributed execution services for acyclic net- 5.5 Large-Scale Machine Learning works of processes. DryadLINQ supplies the application We ran two machine-learning experiments to investigate layer. DryadLINQ’s performance on iterative numerical algo- rithms. 6.1 Parallel Databases The first experiment is a clustering algorithm for de- tecting botnets. We analyze around 2.1 GBytes of data, Many of the core ideas employed by DryadLINQ (such where each datum is a three-dimensional vector summa- as shared-nothing architecture, horizontal data partition- rizing salient features of a single computer, and group ing, dynamic repartitioning, parallel query evaluation, 8th USENIX Symposium on Operating Systems Design and Implementation 11 USENIX Association

12 and dataflow scheduling), can be traced to early research At the storage layer a variety of very large scale projects in parallel databases [18], such as Gamma [17], simple databases have appeared, including Google’s Bubba [8], and Volcano [22], and found in commercial BigTable [11], Amazon’s Simple DB, and Microsoft products for data warehousing such as Teradata, IBM SQL Server Data Services. Architecturally, DryadLINQ DB2 Parallel Edition [4], and Tandem SQL/MP [20]. is just an application running on top of Dryad and gener- ating distributed Dryad jobs. We can envision making it Although DryadLINQ builds on many of these ideas, interoperate with any of these storage layers. it is not a traditional database. For example, DryadLINQ provides a generalization of the concept of query lan- guage, but it does not provide a data definition lan- 6.3 Declarative Programming Languages guage (DDL) or a data management language (DML) and it does not provide support for in-place table up- Notable research projects in parallel declarative lan- dates or transaction processing. We argue that the DDL guages include Parallel Haskell [37], Cilk [7], and and DML belong to the storage layer, so they should not NESL [6]. be a first-class part of the application layer. However, There has also been a recent surge of activity on layer- as Section 4.2 explains, the DryadLINQ optimizer does ing distributed and declarative programming language on make use of partitioning and typing information avail- top of distributed computation platforms. For example, able as metadata attached to input datasets, and will write Sawzall [32] is compiled to MapReduce applications, such metadata back to an appropriately configured stor- while Pig [31] programs are compiled to the Hadoop in- age layer. frastructure. The MapReduce model is extended to sup- Traditional databases offer extensibility beyond the port Joins in [12]. Other examples include Pipelets [9], simple relational data model through embedded lan- HIVE (an internal Facebook language built on Hadoop), guages and stored procedures. DryadLINQ (following and Scope [10], Nebula [26], and PSQL (internal Mi- LINQ’s design) turns this relationship around, and em- crosoft languages built on Dryad). beds the expression language in the high-level program- Grid computing usually provides workflows (and not ming language. This allows DryadLINQ to provide very a programming language interface), which can be tied rich native datatype support: almost all native .NET together by a user-level application. Examples include types can be manipulated as typed, first-class objects. Swift [39] and its scripting language, Taverna [30], and In order to enable parallel expression execution, Triana [36]. DryadLINQ is a higher-level language, DryadLINQ employs many traditional parallelization which better conceals the underlying execution fabric. and query optimization techniques, centered on horizon- tal data partitioning. As mentioned in the Introduction, the expression plan generated by DryadLINQ is virtu- 7 Discussion and Conclusions alized. This virtualization underlies DryadLINQ’s dy- namic optimization techniques, which have not previ- DryadLINQ has been in use by a small community of de- ously been reported in the literature [16]. velopers for over a year, resulting in tens of large appli- cations and many more small programs. The system was recently released more widely within Microsoft and our 6.2 Large Scale Data-Parallel Computa- experience with it is rapidly growing as a result. Feed- tion Infrastructure back from users has generally been very positive. It is perhaps of particular interest that most of our users man- The last decade has seen a flurry of activity in archi- age small private clusters of, at most, tens of computers, tectures for processing very large datasets (as opposed and still find substantial benefits from DryadLINQ. to traditional high-performance computing which is typ- Of course DryadLINQ is not appropriate for all dis- ically CPU-bound). One of the earliest commercial tributed applications, and this lack of generality arises generic platforms for distributed computation was the from design choices in both Dryad and LINQ. Teoma Neptune platform [13], which introduced a map- reduce computation paradigm inspired by MPI’s Re- The Dryad execution engine was engineered for batch duce operator. The Google MapReduce framework [15] applications on large datasets. There is an overhead of slightly extended the computation model, separated the at least a few seconds when executing a DryadLINQ execution layer from storage, and virtualized the exe- EPG which means that DryadLINQ would not cur- cution. The Hadoop open-source port of MapReduce rently be well suited to, for example, low-latency dis- uses the same architecture. NetSolve [5] proposed a tributed database lookups. While one could imagine re- grid-based architecture for a generic execution layer. engineering Dryad to mitigate some of this latency, an DryadLINQ has a richer set of operators and better lan- effective solution would probably need to adopt differ- guage support than any of these other proposals. ent strategies for, at least, resource virtualization, fault- 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 12

13 tolerance, and code generation and so would look quite DryadLINQ has benefited tremendously from the de- different to our current system. sign choices of LINQ and Dryad. LINQ’s extensibil- ity, allowing the introduction of new execution imple- The question of which applications are suitable for mentations and custom operators, is the key that allows parallelization by DryadLINQ is more subtle. In our ex- us to achieve deep integration of Dryad with LINQ- perience, the main requirement is that the program can enabled programming languages. LINQ’s strong static be written using LINQ constructs: users generally then typing is extremely valuable when programming large- find it straightforward to adapt it to distributed execution scale computations—it is much easier to debug compi- using DryadLINQ—and in fact frequently no adaptation lation errors in Visual Studio than run-time errors in the is necessary. However, a certain change in outlook may cluster. Likewise, Dryad’s flexible execution model is be required to identify the data-parallel components of an well suited to the static and dynamic optimizations we algorithm and express them using LINQ operators. For want to implement. We have not had to modify any part example, the PageRank computation described in Sec- of Dryad to support DryadLINQ’s development. In con- tion 5 uses a Join operation to implement a subroutine trast, many of our optimizations would have been diffi- typically specified as matrix multiplication. cult to express using a more limited computational ab- Dryad and DryadLINQ are also inherently special- straction such as MapReduce. ized for streaming computations, and thus may ap- Our current research focus is on gaining more under- pear very inefficient for algorithms which are natu- standing of what programs are easy or hard to write with rally expressed using random-accesses. In fact for sev- DryadLINQ, and refining the optimizer to ensure it deals eral workloads including breadth-first traversal of large well with common cases. As discussed in Section 4.5, graphs we have found DryadLINQ outperforms special- performance debugging is currently not well supported. ized random-access infrastructures. This is because the We are working to improve the profiling and analysis current performance characteristics of hard disk drives tools that we supply to programmers, but we are ulti- ensures that sequential streaming is faster than small mately more interested in improving the system’s ability random-access reads even when greater than 99% of the to get good performance automatically. We are also pur- streamed data is discarded. Of course there will be other suing a variety of cluster-computing projects that are en- workloads where DryadLINQ is much less efficient, and abled by DryadLINQ, including storage research tailored as more storage moves from spinning disks to solid-state to the workloads generated by DryadLINQ applications. (e.g. flash memory) the advantages of streaming-only Our overall experience is that DryadLINQ, by com- systems such as Dryad and MapReduce will diminish. bining the benefits of LINQ—a high-level language and We have learned a lot from our users’ experience of rich data structures—with the power of Dryad’s dis- operator. Many DryadLINQ beginners find Apply the tributed execution model, proves to be an amazingly sim- than to de- Apply it easier to write custom code inside ple, useful and elegant programming environment. Apply termine the equivalent native LINQ expression. is therefore helpful since it lowers the barrier to entry “pol- to use the system. However, the use of Apply Acknowledgements lutes” the relational nature of LINQ and can reduce the We would like to thank our user community for their in- system’s ability to make high-level program transforma- valuable feedback, and particularly Frank McSherry for tions. This tradeoff between purity and ease of use is the implementation and evaluation of PageRank. We also familiar in language design. As system builders we have thank Butler Lampson, Dave DeWitt, Chandu Thekkath, found one of the most useful properties of Apply is that Andrew Birrell, Mike Schroeder, Moises Goldszmidt, sophisticated programmers can use it to manually imple- and Kannan Achan, as well as the OSDI review commit- ment optimizations that DryadLINQ does not perform tee and our shepherd Marvin Theimer, for many helpful automatically. For example, the optimizer currently im- comments and contributions to this paper. plements all reductions using partial sorts and groupings as shown in Figure 6. In some cases operations such as References Count are much more efficiently implemented using hash tables and accumulators, and several developers have in- [1] The DryadLINQ project. to achieve this performance im- Apply dependently used http://research.microsoft.com/research/sv/DryadLINQ/. provement. Consequently we plan to add additional re- [2] The LINQ project. duction patterns to the set of automatic DryadLINQ op- http://msdn.microsoft.com/netframework/future/linq/. timizations. This experience strengthens our belief that, [3] Sort benchmark. at the current stage in the evolution of the system, it is http://research.microsoft.com/barc/SortBenchmark/. best to give users flexibility and suffer the consequences HIN ARU , C. K., F - , G., G OYAL , A., H SIAO , H., J ECTEAU [4] B - AND IL W GRAN , A., P ADMANABHAN , S., C OPELAND , G. P., when they use that flexibility unwisely. 8th USENIX Symposium on Operating Systems Design and Implementation 13 USENIX Association

14 [23] G , P., UNSZT , A., T RAY , J., S ZALAY , A., K HAKAR IBM Systems Journal 34 , 2, SON , W. G. DB2 parallel edition. S 1995. TOUGHTON , J. Data ANDENBERG V AND , D., LUTZ , C., S mining the SDSS SkyServer database. In Distributed Data and [5] B LANK P AND , J., ONGARRA , M., D ECK , J. S. NetSolve/D: Structures 4: Records of the 4th International Meeting , 2002. A massively parallel grid execution system for scalable data in- International Parallel and Distributed tensive collaboration. In [24] H ASAN ,W.,F LORESCU , D., , P. Open issues ALDURIEZ AND V Processing Symposium (IPDPS) , 2005. in parallel query optimization. SIGMOD Rec. 25 , 3, 1996. Communi- LELLOCH , G. E. Programming parallel algorithms. [6] B AMILTON H AND , M., , J. M., S ELLERSTEIN TONEBRAKER , H [25] cations of the ACM (CACM) 39 , 3, 1996. J. Architecture of a database system. Foundations and Trends in , 2, 2007. Databases 1 , LUMOFE , R. D., J OERG , C. F., K USZMAUL , B., L EISERSON [7] B C. E., R , K. H., AND Z HOU , Y. Cilk: An efficient ANDALL [26] I , M., Y U SARD , M., B UDIU - ET ,Y.,B IRRELL , A., AND F ACM SIGPLAN Symposium on multithreaded runtime system. In TERLY , D. Dryad: Distributed data-parallel programs from se- , 1995. Principles and Practice of Parallel Programming (PPoPP) Proceedings of European Conference quential building blocks. In , 2007. on Computer Systems (EuroSys) , G., OPELAND , L., C LAY , W., C LEXANDER , H., A ORAL [8] B RANKLIN , M., MITH , B., S ART AND , S., F ANFORTH , M., H D [27] K , D. J. Efficient mid-query re- W E D AND , N., ABRA ITT , P. Prototyping Bubba, a highly parallel database ALDURIEZ V optimization of sub-optimal query execution plans. In SIGMOD , 1, 1990. system. IEEE Trans. on Knowl. and Data Eng. 2 International Conference on Management of Data , 1998. , D. Pipelets: A framework for OSTE C E D ARNAHAN , J., AND [9] C K [28] , D. The state of the art in distributed query process- OSSMANN distributed computation. In W4: Learning in Web Search , 2005. , 4, 2000. ing. ACM Comput. Surv. 32 [10] C AMSEY , B., HAIKEN , R., J ENKINS , B., L ARSON , P.-Å., R M [29] AMEURLAIN H AND , F., ORVAN , A. Dynamic memory allo- S AND HOU , J. SCOPE: Easy and , S., EAVER , D., W HAKIB Z Symposium on cation strategies for parallel query execution. In efficient parallel processing of massive data sets. In International , 2002. Applied computing (SAC) Conference of Very Large Data Bases (VLDB) , 2008. REENWOOD , J., , T., G INN O ERRIS , M., F DDIS , M., A [30] HANG HEMAWAT EAN , J., G ,F.,D , S., H SIEH , W. C., W AL - C [11] G LOVER , K., G OBLE , C., H ULL , D., M ARVIN , D., L I , P., AND , A., IKES , T., F HANDRA , M., C URROWS , D. A., B LACH ORD ,P.,P OCOCK , M. R., S ENGER , M., W IPAT , A., AND L RUBER , R. E. BigTable: A distributed storage system for struc- G W ROE , C. Taverna: Lessons in creating a workflow environment tured data. In Symposium on Operating System Design and Im- Concurrency and Computation: Practice for the life sciences. , 2006. plementation (OSDI) and Experience 18 , 10, 2005. , A., H Y CHIH , ANG , H., D ASDAN ARKER SIAO , R.-L., AND P [12] [31] O , B., S EED , C., R , R., AND LSTON , U., K UMAR RIVASTAVA D. S. Map-reduce-merge: simplified relational data processing , A. Pig Latin: A not-so-foreign language for data OMKINS T on large clusters. In SIGMOD international conference on Man- processing. In International Conference on Management of Data agement of data , 2007. , 2008. (Industrial Track) (SIGMOD) ANG AND , L., T , K. Optimizing data HU S , T., ANG , H., Y HEN [13] C P [32] Q AND , R., RIESEMER , S., G ORWARD , R., D IKE UINLAN , S. Symposium on aggregation for cluster-based internet services. In Interpreting the data: Parallel analysis with Sawzall. Scientific , 2003. Principles and practice of parallel programming (PPoPP) , 4, 2005. Programming 13 , B., AGEVILLE , T., D RUANES , B. Parallel SQL HOSH G AND C [14] [33] S IMMA , , A., G ARHAM , J., B ORMICK C AC , M., M OLDSZMIDT ACM SIGMOD execution in Oracle 10g. In , 2004. , R. CT-NOR: rep- LACK , R., I SAACS , R., AND M ORTIER P., B resenting and reasoning about events in continuous time. In In- AND G HEMAWAT , J., , S. MapReduce: Simplified data EAN [15] D ternational Conference on Uncertainty in Artificial Intelligence , Proceedings of the 6th Symposium processing on large clusters. In 2008. on Operating Systems Design and Implementation (OSDI) , 2004. S [34] , C., Ç EAR HERNI - , M., B ETINTEMEL TONEBRAKER , U., C ESHPANDE , A., I VES , Z., AND R AMAN , V. Adaptive query [16] D E , M., G ACK , IFTER , S., L ARIZOPOULOS , N., H ACHEM ,T.,H processing. , 1, 2007. Foundations and Trends in Databases 1 J., R DONIK , S. One size fits all? Part 2: , J., Z AND OGERS [17] D CHNEIDER , S., S E W ITT , D., G HANDEHARIZADEH , D., Benchmarking results. In Conference on Innovative Data Sys- , A., RICKER , H., B SIAO , R. The Gamma ASMUSSEN R AND H , 2005. tems Research (CIDR) database machine project. IEEE Transactions on Knowledge and S [35] ADDEN - TONEBRAKER , M., M ARI , S., A BADI , D. J., H Data Engineering 2 , 1, 1990. , S., H ZOPOULOS , P. The end of ELLAND H AND , N., ACHEM ITT , J. Parallel database systems: The fu- RAY G AND , D., W E [18] D Interna- an architectural era (it’s time for a complete rewrite). In Communications ture of high performance database processing. tional Conference of Very Large Data Bases (VLDB) , 2007. , 6, 1992. of the ACM 36 [36] T AYLOR , A. ARRISON H AND , I., ANG , M., W , I., S HIELDS UFFY , J. A query language for data parallel programming. In [19] D Workflows for e-Science . 2007, ch. The Triana Workflow En- Proceedings of the 2007 workshop on Declarative aspects of mul- vironment: Architecture and Applications, pp. 320–339. , 2007. ticore programming [37] T OINTON P AND , H.-W., OIDL , P., L RINDER , R. Parallel and NGLERT , S., G LASSTONE , R., AND H ASAN , W. Parallelism E [20] Journal of Functional Programming 12 , distributed Haskells. Sigmod and its price : A case study of nonstop SQL/MP. In (4&5), 2002. Record , 1995. Y [38] SARD , RLINGSSON , M., E UDIU ETTERLY , M., F ,Y.,I U , D., B , Y. C., , L., L U , H., T AY ENG AND T UNG , A. K. H. Buffer [21] F , CHAN A AND , F., HERRY S C , J., M UNDA , P. K., C URREY Ú., G management in distributed database systems: A data mining K. Some sample programs written in DryadLINQ. Tech. Rep. International Conference on Extending based approach. In MSR-TR-2008-74, Microsoft Research, 2008. , 1998, H.-J. Schek, F. Saltor, I. Ramos, and Database Technology Z [39] , M., C VON , I., HAO ,Y.,H ATEGAN OSTER LIFFORD , B., F Lecture Notes in Computer Science G. Alonso, Eds., vol. 1377 of . , I., S , T., -P TEF AICU L ASZEWSKI , G., N EFEDOVA , V., R RAUN , G. Encapsulation of parallelism in the Volcano query RAEFE [22] G AND ILDE , M. Swift: Fast, reliable, loosely coupled parallel W processing system. In SIGMOD International Conference on computation. IEEE Congress on Services , 2007. , 1990. Management of data 8th USENIX Symposium on Operating Systems Design and Implementation USENIX Association 14

Related documents