p1792 Akidau

Transcript

1 The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, andez-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Rafael J. Fern ́ Frances Perry, Eric Schmidt, Sam Whittle Google takidau, robertwb, chambers, chernyak, rfernand, { relax, sgmc, millsd, fjp, cloude, samuelw @google.com } 1. INTRODUCTION ABSTRACT Modern data processing is a complex and exciting field. Unbounded, unordered, global-scale datasets are increas- From the scale enabled by MapReduce [16] and its successors ingly common in day-to-day business (e.g. Web logs, mobile (e.g Hadoop [4], Pig [18], Hive [29], Spark [33]), to the vast usage statistics, and sensor networks). At the same time, body of work on streaming within the SQL community (e.g. consumers of these datasets have evolved sophisticated re- query systems [1, 14, 15], windowing [22], data streams [24], quirements, such as event-time ordering and windowing by time domains [28], semantic models [9]), to the more recent features of the data themselves, in addition to an insatiable forays in low-latency processing such as Spark Streaming hunger for faster answers. Meanwhile, practicality dictates [34], MillWheel, and Storm [5], modern consumers of data that one can never fully optimize along all dimensions of cor- wield remarkable amounts of power in shaping and tam- rectness, latency, and cost for these types of input. As a re- ing massive-scale disorder into organized structures with far sult, data processing practitioners are left with the quandary greater value. Yet, existing models and systems still fall of how to reconcile the tensions between these seemingly short in a number of common use cases. competing propositions, often resulting in disparate imple- Consider an initial example: a streaming video provider mentations and systems. wants to monetize their content by displaying video ads and We propose that a fundamental shift of approach is nec- billing advertisers for the amount of advertising watched. essary to deal with these evolved requirements in modern The platform supports online and offline views for content data processing. We as a field must stop trying to groom un- and ads. The video provider wants to know how much to bill bounded datasets into finite pools of information that even- each advertiser each day, as well as aggregate statistics about tually become complete, and instead live and breathe under the videos and ads. In addition, they want to efficiently run the assumption that we will never know if or when we have offline experiments over large swaths of historical data. seen all of our data, only that new data will arrive, old data Advertisers/content providers want to know how often may be retracted, and the only way to make this problem and for how long their videos are being watched, with which tractable is via principled abstractions that allow the prac- content/ads, and by which demographic groups. They also titioner the choice of appropriate tradeoffs along the axes of want to know how much they are being charged/paid. They interest: correctness, latency, and cost. want all of this information as quickly as possible, so that In this paper, we present one such approach, the Dataflow 1 they can adjust budgets and bids, change targeting, tweak Model , along with a detailed examination of the semantics campaigns, and plan future directions in as close to real it enables, an overview of the core principles that guided its time as possible. Since money is involved, correctness is design, and a validation of the model itself via the real-world paramount. experiences that led to its development. Though data processing systems are complex by nature, the video provider wants a programming model that is sim- 1 We use the term “Dataflow Model” to describe the pro- ple and flexible. And finally, since the Internet has so greatly cessing model of Google Cloud Dataflow [20], which is based expanded the reach of any business that can be parceled upon technology from FlumeJava [12] and MillWheel [2]. along its backbone, they also require a system that can han- dle the diaspora of global scale data. The information that must be calculated for such a use This work is licensed under the Creative Commons Attribution- case is essentially the time and length of each video viewing, NonCommercial-NoDerivs 3.0 Unported License. To view a copy of this li- who viewed it, and with which ad or content it was paired cense, visit http://creativecommons.org/licenses/by-nc-nd/3.0/. Obtain per- mission prior to any use beyond those covered by the license. Contact (i.e. per-user, per-video viewing sessions ). Conceptually copyright holder by emailing [email protected] Articles from this volume this is straightforward, yet existing models and systems all were invited to present their results at the 41st International Conference on fall short of meeting the stated requirements. Very Large Data Bases, August 31st - September 4th 2015, Kohala Coast, Batch systems such as MapReduce (and its Hadoop vari- Hawaii. ants, including Pig and Hive), FlumeJava, and Spark suffer Proceedings of the VLDB Endowment, Vol. 8, No. 12 Copyright 2015 VLDB Endowment 2150-8097/15/08. 1792

2 from the latency problems inherent with collecting all input all three see widespread use in unbounded data processing data into a batch before processing it. For many streaming today. Abstracted away beneath a model of sufficient gener- systems, it is unclear how they would remain fault-tolerant ality and flexibility, we believe the choice of execution engine at scale (Aurora [1], TelegraphCQ [14], Niagara [15], Esper can become one based solely on the practical underlying dif- [17]). Those that provide scalability and fault-tolerance fall ferences between them: those of latency and resource cost. Taken from that perspective, the conceptual contribution short on expressiveness or correctness vectors. Many lack of this paper is a single unified model which: the ability to provide exactly-once semantics (Storm, Samza [7], Pulsar [26]), impacting correctness. Others simply lack 5 2 ordered re- Allows for the calculation of event-time • the temporal primitives necessary for windowing (Tigon sults, windowed by features of the data themselves, [11]), or provide windowing semantics that are limited to over an unbounded, unordered data source, with cor- tuple- or processing-time-based windows (Spark Streaming rectness, latency, and cost tunable across a broad spec- [34], Sonora [32], Trident [5]). Most that provide event-time- trum of combinations. based windowing either rely on ordering (SQLStream [27]), 3 semantics in event-time or have limited window triggering Decomposes pipeline implementation across four re- • mode (Stratosphere/Flink [3, 6]). CEDR [8] and Trill [13] lated dimensions, providing clarity, composability, and are noteworthy in that they not only provide useful trigger- flexibility: ing semantics via punctuations [30, 28], but also provide an overall incremental model that is quite similar to the one results are being computed. – What we propose here; however, their windowing semantics are in event time they are being computed. – Where insufficient to express sessions, and their periodic punctu- – When in processing time they are materialized. ations are insufficient for some of the use cases in Section 3.3. MillWheel and Spark Streaming are both sufficiently earlier results relate to later refinements. – How scalable, fault-tolerant, and low-latency to act as reason- • Separates the logical notion of data processing from able substrates, but lack high-level programming models the underlying physical implementation, allowing the that make calculating event-time sessions straightforward. choice of batch, micro-batch, or streaming engine to The only scalable system we are aware of that supports a 4 become one of simply correctness, latency, and cost. such as sessions is high-level notion of unaligned windows Pulsar, but that system fails to provide correctness, as noted Concretely, this contribution is enabled by the following: above. Lambda Architecture [25] systems can achieve many of the desired requirements, but fail on the simplicity axis on windowing model A • which supports unaligned event- account of having to build and maintain two systems. Sum- time windows, and a simple API for their creation and mingbird [10] ameliorates this implementation complexity use (Section 2.2). by abstracting the underlying batch and streaming systems behind a single interface, but in doing so imposes limitations • A triggering model that binds the output times of on the types of computation that can be performed, and still results to runtime characteristics of the pipeline, with requires double the operational complexity. a powerful and flexible declarative API for describing None of these shortcomings are intractable, and systems desired triggering semantics (Section 2.3). in active development will likely overcome them in due time. But we believe a major shortcoming of all the models and that integrates incremental processing model An • systems mentioned above (with exception given to CEDR retractions and updates into the windowing and trig- and Trill), is that they focus on input data (unbounded or gering models described above (Section 2.3). otherwise) as something which will at some point become Scalable implementations • of the above atop the complete. We believe this approach is fundamentally flawed MillWheel streaming engine and the FlumeJava batch when the realities of today’s enormous, highly disordered engine, with an external reimplementation for Google datasets clash with the semantics and timeliness demanded Cloud Dataflow, including an open-source SDK [19] by consumers. We also believe that any approach that is to that is runtime-agnostic (Section 3.1). have broad practical value across such a diverse and varied set of use cases as those that exist today (not to mention core principles • that guided the design of A set of those lingering on the horizon) must provide simple, but this model (Section 3.2). powerful, tools for balancing the amount of correctness, la- tency, and cost appropriate for the specific use case at hand. • with real-world experiences Brief discussions of our Lastly, we believe it is time to move beyond the prevailing massive-scale, unbounded, out-of-order data process- mindset of an execution engine dictating system semantics; ing at Google that motivated development of this model properly designed and built batch, micro-batch, and stream- (Section 3.3). ing systems can all provide equal levels of correctness, and It is lastly worth noting that there is nothing magical 2 about this model. Things which are computationally im- By windowing, we mean as defined in Li [22], i.e. slicing data into finite chunks for processing. More in Section 1.2. practical in existing strongly-consistent batch, micro-batch, 3 By triggering, we mean stimulating the output of a specific streaming, or Lambda Architecture systems remain so, with window at a grouping operation. More in Section 2.3. the inherent constraints of CPU, RAM, and disk left stead- 4 By unaligned windows, we mean windows which do not fastly in place. What it does provide is a common framework span the entirety of a data source, but instead only a subset 5 By event times, we mean the times at which events oc- of it, such as per-user windows. This is essentially the frames curred, not when they are processed. More in Section 1.3. idea from Whiteneck [31]. More in Section 1.2. 1793

3 Key 1 Key 3 Key 1 Key 2 Key 3 Key 2 Key 3 Key 2 Key 1 period may be less than the size, which means the windows may overlap. Sliding windows are also typically aligned; 1 1 1 even though the diagram is drawn to give a sense of sliding 2 2 motion, all five windows would be applied to all three keys in 3 2 4 3 the diagram, not just Window 3. Fixed windows are really 5 3 a special case of sliding windows where size equals period. 4 Sessions are windows that capture some period of activ- Sliding ity over a subset of the data, in this case per key. Typically Sessions Fixed they are defined by a timeout gap. Any events that occur within a span of time less than the timeout are grouped together as a session. Sessions are unaligned windows. For Figure 1: Common Windowing Patterns example, Window 2 applies to Key 1 only, Window 3 to Key 2 only, and Windows 1 and 4 to Key 3 only. that allows for the relatively simple expression of parallel 1.3 Time Domains computation in a way that is independent of the underly- When processing data which relate to events in time, there ing execution engine, while also providing the ability to dial are two inherent domains of time to consider. Though cap- in precisely the amount of latency and correctness for any tured in various places across the literature (particularly specific problem domain given the realities of the data and time management [28] and semantic models [9], but also resources at hand. In that sense, it is a model aimed at ease windowing [22], out-of-order processing [23], punctuations of use in building practical, massive-scale data processing [30], heartbeats [21], watermarks [2], frames [31]), the de- pipelines. tailed examples in section 2.3 will be easier to follow with the concepts clearly in mind. The two domains of interest 1.1 Unbounded/Bounded vs Streaming/Batch are: When describing infinite/finite , we prefer the data sets terms unbounded/bounded over streaming/batch, because , which is the time at which the event Event Time • the latter terms carry with them an implication of the use itself actually occurred , i.e. a record of system clock of a specific type of execution engine. In reality, unbounded time (for whatever system generated the event) at the datasets have been processed using repeated runs of batch time of occurrence. systems since their conception, and well-designed streaming systems are perfectly capable of processing bounded data. , which is the time at which an Processing Time • From the perspective of the model, the distinction of stream- event is observed at any given point during processing ing or batch is largely irrelevant, and we thus reserve those within the pipeline, i.e. the current time according terms exclusively for describing runtime execution engines . to the system clock. Note that we make no assump- tions about clock synchronization within a distributed 1.2 Windowing system. Windowing [22] slices up a dataset into finite chunks for processing as a group. When dealing with unbounded data, Event time for a given event essentially never changes, windowing is required for some operations (to delineate fi- but processing time changes constantly for each event as it nite boundaries in most forms of grouping: aggregation, flows through the pipeline and time marches ever forward. outer joins, time-bounded operations, etc.), and unneces- This is an important distinction when it comes to robustly sary for others (filtering, mapping, inner joins, etc.). For analyzing events in the context of when they occurred. bounded data, windowing is essentially optional, though still During processing, the realities of the systems in use (com- a semantically useful concept in many situations (e.g. back- munication delays, scheduling algorithms, time spent pro- filling large scale updates to portions of a previously com- cessing, pipeline serialization, etc.) result in an inherent puted unbounded data source). Windowing is effectively and dynamically changing amount of skew between the two always time based; while many systems support tuple-based domains. Global progress metrics, such as punctuations or windowing, this is essentially time-based windowing over a watermarks, provide a good way to visualize this skew. For logical time domain where elements in order have succes- our purposes, we’ll consider something like MillWheel’s wa- sively increasing logical timestamps. Windows may be ei- , which is a lower bound (often heuristically estab- termark 6 , i.e. applied across all the data for the window aligned ther ) on event times that have been processed by the lished unaligned of time in question, or , i.e. applied across only 6 For most real-world distributed data sets, the system lacks specific subsets of the data (e.g. per key) for the given win- sufficient knowledge to establish a 100% correct watermark. dow of time. Figure 1 highlights three of the major types of For example, in the video sessions use case, consider offline windows encountered when dealing with unbounded data. views. If someone takes their mobile device into the wilder- Fixed windows (sometimes called tumbling windows) are ness, the system has no practical way of knowing when they defined by a static window size, e.g. hourly windows or daily might come back to civilization, regain connection, and be- gin uploading data about video views during that time. As a windows. They are generally aligned, i.e. every window result, most watermarks must be heuristically defined based applies across all of the data for the corresponding period on limited knowledge available. For structured input sources of time. For the sake of spreading window completion load that expose metadata regarding unobserved data, such as evenly across time, they are sometimes unaligned by phase log files, we’ve found these heuristics to be remarkably ac- shifting the windows for each key by some random value. curate, and thus practically useful as a completion estimate windows are defined by a window size and slide Sliding for many use cases. Furthermore, and importantly, once a heuristic watermark has been established, it can then be period, e.g. hourly windows starting every minute. The 1794

4 for generic parallel processing. Each input ele- • ParDo 12:04 ment to be processed (which itself may be a finite col- lection) is provided to a user-defined function (called a DoFn in Dataflow), which can yield zero or more out- 12:03 put elements per input. For example, consider an op- eration which expands all prefixes of the input key, 12:02 duplicating the value across them: Processing Time 2) ( f ix, 1) , ( f it, 12:01   P arDo (  y ExpandP ref ixes ) 12:01 12:02 12:04 12:03 Event Time f, 1) ( f i, 1) , ( f ix, 1) , ( f, 2) , ( f i, 2) , ( f it, 2) ( , Actual watermark: • ) pairs. key, value for key-grouping ( GroupByKey Ideal watermark: Event Time Skew: ( 2) f it, ( , 2) f i, ( , 2) f, , f, 1) f ix, ( , 1) f i, ( , 1) (  Figure 2: Time Domain Skew y GroupByKey [1 [1 f i, ( , 2]) , ( f, [2]) f it, ( , [1]) f ix, ( , 2]) , pipeline. As we’ve made very clear above, notions of com- pleteness are generally incompatible with correctness, so we The ParDo operation operates element-wise on each input won’t rely on watermarks as such. They do, however, pro- element, and thus translates naturally to unbounded data. vide a useful notion of when the system thinks it likely that all operation, on the other hand, collects GroupByKey The all data up to a given point in event time have been observed, data for a given key before sending them downstream for and thus find application in not only visualizing skew, but reduction. If the input source is unbounded, we have no in monitoring overall system health and progress, as well as way of knowing when it will end. The common solution to making decisions around progress that do not require com- this problem is to window the data. plete accuracy, such as basic garbage collection policies. 2.2 Windowing In an ideal world, time domain skew would always be zero; we would always be processing all events immediately Systems which support grouping typically redefine their as they happen. Reality is not so favorable, however, and operation to essentially be GroupByKey GroupByKeyAnd- often what we end up with looks more like Figure 2. Starting Window . Our primary contribution here is support for un- around 12:00, the watermark starts to skew more away from aligned windows, for which there are two key insights. The real time as the pipeline lags, diving back close to real time first is that it is simpler to treat all windowing strategies around 12:02, then lagging behind again noticeably by the as unaligned from the perspective of the model, and allow time 12:03 rolls around. This dynamic variance in skew underlying implementations to apply optimizations relevant is very common in distributed data processing systems, and to the aligned cases where applicable. The second is that will play a big role in defining what functionality is necessary windowing can be broken apart into two related operations: for providing correct, repeatable results. Set • , which assigns the (T datum) AssignWindows element to zero or more windows. This is essentially 2. DATAFLOW MODEL the Bucket Operator from Li [22]. In this section, we will define the formal model for the • Set MergeWindows (Set windows) , which system and explain why its semantics are general enough merges windows at grouping time. This allows data- to subsume the standard batch, micro-batch, and streaming driven windows to be constructed over time as data models, as well as the hybrid streaming and batch semantics arrive and are grouped together. of the Lambda Architecture. For code examples, we will use For any given windowing strategy, the two operations are a simplified variant of the Dataflow Java SDK, which itself intimately related; sliding window assignment requires slid- is an evolution of the FlumeJava API. ing window merging, sessions window assignment requires 2.1 Core Primitives sessions window merging, etc. Note that, to support event-time windowing natively, in- To begin with, let us consider primitives from the classic ) pairs through the system, we key, value stead of passing ( batch model. The Dataflow SDK has two core transforms ) 4-tuples. Ele- time, window key, value, event now pass ( ) pairs flowing through the key, value that operate on the ( 7 ments are provided to the system with event-time times- system : tamps (which may also be modified at any point in the 8 propagated accurately downstream through the rest of the pipeline ), and are initially assigned to a default global win- pipeline (much like a punctuation would), though the overall dow, covering all of event time, providing semantics that metric itself remains a heuristic. match the defaults in the standard batch model. 7 Without loss of generality, we will treat all elements in the 8 key, value system as ( Note, however, that certain timestamp modification oper- ) pairs, even though a key is not ac- tually required for certain operations, such as ParDo. Most ations are antagonistic to progress tracking metrics like wa- GroupByKey termarks; moving a timestamp behind the watermark makes of the interesting discussions revolve around , which does require keys, so assuming they exist is simpler. a given element late with respect to that watermark. 1795

5 [0 , v , )) ∞ , ( , 13:02 , k ∞ , 12:01 , [0 , , )) 12:00 , [0 , ∞ )) , ( k, v ( k, v 1 1 2 1 ( , [0 , 13:14 , ∞ )) , , v k 2 2   AssignW indows ( ∞ 13:57 [0 , , v , k , ( , )) 1 3  y )) m m, (2 Sliding 1 , )) ( k ∞ , v , , 13:20 [0 4 1  , [11:59 , 12:01)) , k, v ( , 12:00 1  ( AssignW indows  , 12:02)) , ( , [12:00 , 12:00 k, v 1 y m (30 Sessions )) 12:02)) , [12:00 , 12:01 , ( k, v , 2 k , v , 13:32)) ( , [13:02 , 13:02 , 12:01 12:03)) k, v , , ( , [12:01 1 1 2 , 13:44)) k , , v ( , 13:14 , [13:14 2 2 [13:57 , , , 13:57 , , v 14:27)) k ( 1 3 Figure 3: Window Assignment k ( 13:50)) , [13:20 , 13:20 , , v 1 4  y DropT imestamps 2.2.1 Window Assignment [13:02 ( k , v , , 13:32)) , 1 1 From the model’s perspective, window assignment creates ( 13:44)) , v , , [13:14 , k 2 2 a new copy of the element in each of the windows to which , , v 14:27)) , [13:57 k ( , 3 1 it has been assigned. For example, consider windowing a k ( , v , [13:20 , 13:50)) 4 1 dataset by sliding windows of two-minute width and one- minute period, as shown in Figure 3 (for brevity, timestamps  y GroupByKey are given in HH:MM format). ) pairs is dupli- key, value In this case, each of the two ( 13:32)) , [( v , , [13:02 k ( , 1 1 cated to exist in both of the windows that overlapped the v , ( 14:27)) , [13:57 , 3 element’s timestamp. Since windows are associated directly , 13:50))]) , [13:20 , ( v 4 with the elements to which they belong, this means win- 13:44))]) , [13:14 [( ( k , , v 2 2 dow assignment can happen anywhere in the pipeline be-  fore grouping is applied. This is important, as the grouping  ( M ergeW indows operation may be buried somewhere downstream inside a  y Sessions (30 m )) composite transformation (e.g. ). Sum.integersPerKey() , , ( k 50 , [( v )) 13 [ 13 : 02 , : 1 1 2.2.2 Window Merging , v ( 14:27)) , [13:57 , 3 Window merging occurs as part of the GroupByKeyAnd- : v ))]) , [ 13 50 02 , 13 : ( , 4 Window operation, and is best explained in the context of an 13:44))]) , [13:14 , v [( , k ( 2 2 example. We will use session windowing since it is our mo-  y tivating use case. Figure 4 shows four example data, three GroupAlsoByW indow for k , as they are windowed by session, with and one for k 2 1 a 30-minute session timeout. All are initially placed in a ( , , ] 13:50)) v , , v [([ , [13:02 k 1 1 4 default global window by the system. The sessions imple- ([ , , 14:27))]) [13:57 , ] v 3 mentation of puts each element into a sin- AssignWindows , [13:14 , ] v [([ , k ( 13:44))]) 2 2 gle window that extends 30 minutes beyond its own times-  y tamp; this window denotes the range of time into which ExpandT oElements later events can fall if they are to be considered part of the GroupByKeyAndWindow same session. We then begin the ] , v v [ , k ( 13:50)) , [13:02 , 50 : 13 , , 4 1 1 operation, which is really a five-part composite operation: , , [ v k 14:27)) ( ] , 14 : 27 , [13:57 , 1 3 k ( , ] : 13 44 , [13:14 , 13:44)) v [ , 2 2 DropTimestamps - Drops element timestamps, as • 9 . only the window is relevant from here on out Figure 4: Window Merging • GroupByKey - Groups ( value, window ) tuples by key. ExpandToElements • - Expands per-key, per-window time, window ) groups of values into ( key, value, event • - Merges the set of currently buffered MergeWindows tuples, with new per-window timestamps. In this ex- windows for a key. The actual merge logic is defined ample, we set the timestamp to the end of the window, by the windowing strategy. In this case, the windows but any timestamp greater than or equal to the times- for v overlap, so the sessions windowing strat- and v 4 1 tamp of the earliest event in the window is valid with egy merges them into a single new, larger session, as respect to watermark correctness. indicated in bold. 2.2.3 API • GroupAlsoByWindow - For each key, groups values As a brief example of the use of windowing in practice, and by window. After merging in the prior step, v 1 consider the following Cloud Dataflow SDK code to calculate v are now in identical windows, and thus are grouped 4 keyed integer sums: together at this step. PCollection> input = IO.read(...); PCollection> output = input 9 If the user needs them later, it is possible to first materialize .apply(Sum.integersPerKey()); them as part of their value. 1796

6 To do the same thing, but windowed into sessions with a (regardless of execution engine), then we will need a way to 30-minute timeout as in Figure 4, one would add a single provide multiple answers (or panes) for any given window. We call this feature triggers, since they allow the specifica- Window.into call before initiating the summation: tion of when to trigger the output results for a given window. PCollection> input = IO.read(...); In a nutshell, triggers are a mechanism for stimulating the PCollection> output = input production of results in response GroupByKeyAndWindow ( into Sessions ( . withGapDuration .apply( Window . (30)))) standardMinutes . Duration to internal or external signals. They are complementary .apply(Sum.integersPerKey()); to the windowing model, in that they each affect system behaviour along a different axis of time: 2.3 Triggers & Incremental Processing determines event time in where Windowing • data are grouped together for processing. The ability to build unaligned, event-time windows is an improvement, but now we have two more shortcomings to the • Triggering determines when in processing time address: 11 results of groupings are emitted as panes. We need some way of providing support for tuple- and • Our systems provide predefined trigger implementations processing-time-based windows, otherwise we have re- for triggering at completion estimates (e.g. watermarks, in- gressed our windowing semantics relative to other sys- cluding percentile watermarks, which provide useful seman- tems in existence. tics for dealing with stragglers in both batch and streaming execution engines when you care more about processing a • We need some way of knowing when to emit the re- minimum percentage of the input data quickly than process- sults for a window. Since the data are unordered with ing every last piece of it), at points in processing time, and in respect to event time, we require some other signal to response to data arriving (counts, bytes, data punctuations, tell us when the window is done. pattern matching, etc.). We also support composing triggers into logical combinations (and, or, etc.), loops, sequences, The problem of tuple- and processing-time-based windows and other such constructions. In addition, users may define we will address in Section 2.4, once we have built up a so- their own triggers utilizing both the underlying primitives of lution to the window completeness problem. As to window the execution runtime (e.g. watermark timers, processing- completeness, an initial inclination for solving it might be time timers, data arrival, composition support) and any to use some sort of global event-time progress metric, such other relevant external signals (data injection requests, ex- as watermarks. However, watermarks themselves have two ternal progress metrics, RPC completion callbacks, etc.). major shortcomings with respect to correctness: We will look more closely at examples in Section 2.4. • They are sometimes too fast , meaning there may be In addition to controlling when results are emitted, the late data that arrives behind the watermark. For many triggers system provides a way to control how multiple panes distributed data sources, it is intractable to derive a for the same window relate to each other, via three different completely perfect event time watermark, and thus im- refinement modes: possible to rely on it solely if we want 100% correctness Discarding: Upon triggering, window contents are • in our output data. discarded, and later results bear no relation to previ- too slow . Because they are a They are sometimes • ous results. This mode is useful in cases where the global progress metric, the watermark can be held downstream consumer of the data (either internal or back for the entire pipeline by a single slow datum. external to the pipeline) expects the values from vari- And even for healthy pipelines with little variability in ous trigger fires to be independent (e.g. when injecting event-time skew, the baseline level of skew may still be into a system that generates a sum of the values in- multiple minutes or more, depending upon the input jected). It is also the most efficient in terms of amount source. As a result, using watermarks as the sole sig- of data buffered, though for associative and commu- nal for emitting window results is likely to yield higher tative operations which can be modeled as a Dataflow latency of overall results than, for example, a compa- Combiner , the efficiency delta will often be minimal. For rable Lambda Architecture pipeline. our video sessions use case, this is not sufficient, since it is impractical to require downstream consumers of For these reasons, we postulate that watermarks alone our data to stitch together partial sessions. are insufficient. A useful insight in addressing the complete- ness problem is that the Lambda Architecture effectively : Upon triggering, window contents • Accumulating sidesteps the issue: it does not solve the completeness prob- are left intact in persistent state, and later results be- lem by somehow providing correct answers faster; it simply come a refinement of previous results. This is use- provides the best low-latency estimate of a result that the ful when the downstream consumer expects to over- streaming pipeline can provide, with the promise of eventual write old values with new ones when receiving multi- 10 . consistency and correctness once the batch pipeline runs ple results for the same window, and is effectively the If we want to do the same thing from within a single pipeline mode used in Lambda Architecture systems, where the 11 10 Specific triggers, such as watermark triggers, make use of Note that in reality, output from the batch job is only event time in the functionality they provide, but their effects correct if input data is complete by the time the batch job within the pipeline are still realized in the processing time runs; if data evolve over time, this must be detected and the batch jobs re-executed. axis. 1797

7 streaming pipeline produces low-latency results, which watermark. The straight dotted line with slope of one rep- are then overwritten in the future by the results from resents the ideal watermark, i.e. if there were no event-time skew and all events were processed by the system as they the batch pipeline. For video sessions, this might be occurred. Given the vagaries of distributed systems, skew is sufficient if we are simply calculating sessions and then immediately writing them to some output source that a common occurrence; this is exemplified by the meandering path the actual watermark takes in Figure 5, represented by supports updates (e.g. a database or key/value store). the darker, dashed line. Note also that the heuristic nature : Upon triggering, in Accumulating & Retracting • of this watermark is exemplified by the single “late” datum addition to the Accumulating semantics, a copy of the with value 9 that appears behind the watermark. emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for 1 12:09 8 the previous value will be emitted first, followed by the 9 12 new value as a normal datum . Retractions are neces- 12:08 sary in pipelines with multiple serial GroupByKeyAnd- 3 8 operations, since the multiple results gener- Window 12:07 3 ated by a single window over subsequent trigger fires 4 3 may end up on separate keys when grouped down- Processing Time 12:06 7 stream. In that case, the second grouping operation 5 will generate incorrect results for those keys unless it is 12:02 12:06 12:05 12:04 12:03 12:07 12:08 12:01 informed via a retraction that the effects of the original Event Time opera- output should be reversed. Dataflow Combiner Actual watermark: tions that are also reversible can support retractions Ideal watermark: method. For video sessions, efficiently via an uncombine this mode is the ideal. If we are performing aggrega- Figure 5: Example Inputs tions downstream from session creation that depend on properties of the sessions themselves, for example de- If we were to process these data in a classic batch system tecting unpopular ads (such as those which are viewed using the described summation pipeline, we would wait for for less than five seconds in a majority of sessions), all the data to arrive, group them together into one bundle initial results may be invalidated as inputs evolve over (since these data are all for the same key), and sum their val- time, e.g. as a significant number of offline mobile ues to arrive at total result of 51. This result is represented viewers come back online and upload session data. Re- by the darkened rectangle in Figure 6, whose area covers tractions provide a way for us to adapt to these types the ranges of event and processing time included in the sum of changes in complex pipelines with multiple serial (with the top of the rectangle denoting when in processing grouping stages. time the result was materialized). Since classic batch pro- cessing is event-time agnostic, the result is contained within 2.4 Examples a single global window covering all of event time. And since outputs are only calculated once all inputs are received, the We will now consider a series of examples that highlight result covers all of processing time for the execution. the plurality of useful output patterns supported by the Dataflow Model. We will look at each example in the con- 51 51 1 text of the integer summation pipeline from Section 2.2.3: 12:09 8 9 PCollection> output = input 12:08 .apply(Sum.integersPerKey()); 3 8 12:07 3 Let us assume we have an input source from which we are 4 3 observing ten data points, each themselves small integer val- Processing Time 12:06 7 ues. We will consider them in the context of both bounded 5 and unbounded data sources. For diagrammatic simplicity, 12:06 12:07 12:01 12:08 12:02 12:03 12:04 12:05 we will assume all these data are for the same key; in a real Event Time pipeline, the types of operations we describe here would be Actual watermark: happening in parallel for multiple keys. Figure 5 diagrams Ideal watermark: how these data relate together along both axes of time we care about. The X axis plots the data in event time (i.e. Figure 6: Classic Batch Execution when the events actually occurred), while the Y axis plots the data in processing time (i.e. when the pipeline observes Note the inclusion of watermarks in this diagram. Though them). All examples assume execution on our streaming not typically used for classic batch processing, watermarks engine unless otherwise specified. would semantically be held at the beginning of time until all Many of the examples will also depend on watermarks, data had been processed, then advanced to infinity. An im- in which cases we will include them in our diagrams. We portant point to note is that one can get identical semantics will graph both the ideal watermark and an example actual to classic batch by running the data through a streaming system with watermarks progressed in this manner. 12 A simple implementation of retraction processing requires Now let us say we want to convert this pipeline to run over deterministic operations, but non-determinism may be sup- an unbounded data source. In Dataflow, the default trig- ported with additional complexity and cost; we have seen use cases that require this, such as probabilistic modeling. gering semantics are to emit windows when the watermark 1798

8 passes them. But when using the global window with an Another, more robust way of providing processing-time unbounded input source, we are guaranteed that will never windowing semantics is to simply assign arrival time as event happen, since the global window covers all of event time. As times at data ingress, then use event time windowing. A nice such, we will need to either trigger by something other than side effect of using arrival time event times is that the system the default trigger, or window by something other than the has perfect knowledge of the event times in flight, and thus global window. Otherwise, we will never get any output. can provide perfect (i.e. non-heuristic) watermarks, with Let us first look at changing the trigger, since this will no late data. This is an effective and cost-efficient way of allow us to to generate conceptually identical output (a processing unbounded data for use cases where true event global per-key sum over all time), but with periodic up- times are not necessary or available. dates. In this example, we apply a operation Window.trigger Before we look more closely at other windowing options, that repeatedly fires on one-minute periodic processing-time let us consider one more change to the triggers for this boundaries. We also specify mode so that our Accumulating pipeline. The other common windowing mode we would like global sum will be refined over time (this assumes we have to model is tuple-based windows. We can provide this sort an output sink into which we can simply overwrite previ- of functionality by simply changing the trigger to fire after a ous results for the key with new results, e.g. a database or certain number of data arrive, say two. In Figure 9, we get key/value store). Thus, in Figure 7, we generate updated five outputs, each containing the sum of two adjacent (by global sums once per minute of processing time. Note how processing time) data. More sophisticated tuple-based win- Accu- the semi-transparent output rectangles overlap, since dowing schemes (e.g. sliding tuple-based windows) require mulating panes build upon prior results by incorporating custom windowing strategies, but are otherwise supported. overlapping regions of processing time: PCollection> output = input (2))) .apply(Window.trigger(Repeat( AtCount PCollection> output = input .discarding()) ( Window .apply( trigger Repeat (1, MINUTE))) ( AtPeriod . .apply(Sum.integersPerKey()); ()) . accumulating .apply(Sum.integersPerKey()); 1 12:09 9 9 8 1 9 12:09 51 51 8 12 12 9 12:08 3 12:08 33 33 8 11 11 3 12:07 3 8 4 12:07 22 22 3 7 7 3 Processing Time 4 12:06 3 7 Processing Time 12 12 5 12:06 12 12 7 5 12:07 12:01 12:06 12:02 12:03 12:04 12:05 12:08 Event Time 12:01 12:08 12:07 12:06 12:05 12:04 12:03 12:02 Event Time Figure 9: GlobalWindows, AtCount, Discarding Figure 7: GlobalWindows, AtPeriod, Accumulating Let us now return to the other option for supporting un- in sums once delta If we instead wanted to generate the bounded sources: switching away from global windowing. mode, as in Fig- Discarding per minute, we could switch to To start with, let us window the data into fixed, two-minute ure 8. Note that this effectively gives the processing-time windows: Accumulating windowing semantics provided by many streaming systems. PCollection> output = input The output panes no longer overlap, since their results in- into . FixedWindows (2, MINUTES) .apply(Window. of ( corporate data from independent regions of processing time. .accumulating()) .apply(Sum.integersPerKey()); PCollection> output = input .apply(Window.trigger(Repeat(AtPeriod(1, MINUTE))) With no trigger strategy specified, the system would use ()) discarding . .apply(Sum.integersPerKey()); the default trigger, which is effectively: PCollection> output = input .apply(Window.into(FixedWindows.of(2, MINUTES)) AtWatermark trigger ( Repeat ( ()))) . 1 12:09 18 18 8 .accumulating()) 9 .apply(Sum.integersPerKey()); 12:08 11 11 3 The watermark trigger fires when the watermark passes 8 12:07 10 10 the end of the window in question. Both batch and stream- 3 4 3 ing engines implement watermarks, as detailed in Section Processing Time 12:06 12 12 Repeat 3.1. The call in the trigger is used to handle late 7 5 data; should any data arrive after the watermark, they will 12:07 12:06 12:08 12:05 12:04 12:03 12:02 12:01 instantiate the repeated watermark trigger, which will fire Event Time immediately since the watermark has already passed. Figures 10 − 12 each characterize this pipeline on a dif- Figure 8: GlobalWindows, AtPeriod, Discarding ferent type of runtime engine. We will first observe what 1799

9 execution of this pipeline would look like on a batch engine. range [12:00 , 12:02)) to retrigger with an updated sum: Given our current implementation, the data source would 12 12 1 have to be a bounded one, so as with the classic batch ex- 12:09 8 ample above, we would wait for all data in the batch to 9 14 14 arrive. We would then process the data in event-time order, 12:08 with windows being emitted as the simulated watermark ad- 3 3 3 22 22 8 vances, as in Figure 10: 12:07 3 4 3 Processing Time 12 12 12:06 3 3 5 5 22 22 14 14 7 1 12:09 5 8 9 12:05 12:04 12:03 12:02 12:01 12:08 12:07 12:06 12:08 Event Time 3 Actual watermark: 8 Ideal watermark: 12:07 3 4 3 Processing Time 12:06 Figure 12: FixedWindows, Streaming 7 5 This output pattern is nice in that we have roughly one 12:07 12:08 12:04 12:01 12:02 12:03 12:05 12:06 Event Time output per window, with a single refinement in the case of Actual watermark: the late datum. But the overall latency of results is no- Ideal watermark: ticeably worse than the micro-batch system, on account of having to wait for the watermark to advance; this is the case Figure 10: FixedWindows, Batch of watermarks being too slow from Section 2.3. If we want lower latency via multiple partial results for all of our windows, we can add in some additional, processing- Now imagine executing a micro-batch engine over this time-based triggers to provide us with regular updates until data source with one minute micro-batches. The system the watermark actually passes, as in Figure 13. This yields would gather input data for one minute, process them, and somewhat better latency than the micro-batch pipeline, since repeat. Each time, the watermark for the current batch data are accumulated in windows as they arrive instead of would start at the beginning of time and advance to the end being processed in small batches. Given strongly-consistent of time (technically jumping from the end time of the batch micro-batch and streaming engines, the choice between them to the end of time instantaneously, since no data would ex- (as well as the choice of micro-batch size) really becomes just ist for that period). We would thus end up with a new a matter of latency versus cost, which is exactly one of the watermark for every micro-batch round, and corresponding goals we set out to achieve with this model. outputs for all windows whose contents had changed since the last round. This provides a very nice mix of latency and PCollection> output = input .apply(Window.into(FixedWindows.of(2, MINUTES)) eventual correctness, as in Figure 11: SequenceOf .trigger( ( ( RepeatUntil AtPeriod (1, MINUTE), 1 AtWatermark ()), 12 12 14 14 12:09 8 ( ()))) Repeat AtWatermark 9 .accumulating()) 3 3 22 22 12:08 .apply(Sum.integersPerKey()); 3 8 3 3 14 14 12:07 3 4 3 1 Processing Time 12:09 12 12 7 7 8 5 5 12:06 7 9 5 14 14 12:08 3 3 12:08 12:02 12:04 12:05 12:03 12:01 12:06 12:07 3 22 22 Event Time 8 12:07 14 3 3 14 Actual watermark: 3 4 Ideal watermark: 3 Processing Time 12:06 7 5 7 5 7 5 Figure 11: FixedWindows, Micro-Batch 12:05 12:06 12:07 12:02 12:03 12:01 12:04 12:08 Event Time Next, consider this pipeline executed on a streaming en- Actual watermark: gine, as in Figure 12. Most windows are emitted when the Ideal watermark: watermark passes them. Note however that the datum with value 9 is actually late relative to the watermark. For what- Figure 13: FixedWindows, Streaming, Partial ever reason (mobile input source being offline, network par- As one final exercise, let us update our example to satisfy tition, etc.), the system did not realize that datum had not the video sessions requirements (modulo the use of summa- yet been injected, and thus, having observed the 5, allowed the watermark to proceed past the point in event time that tion as the aggregation operation, which we will maintain for diagrammatic consistency; switching to another aggre- would eventually be occupied by the 9. Hence, once the gation would be trivial), by updating to session windowing 9 finally arrives, it causes the first window (for event-time 1800

10 with a one minute timeout and enabling retractions. This 3.2 Design Principles highlights the composability provided by breaking the model Though much of our design was motivated by the real- in event into four pieces ( you are computing, where what world experiences detailed in Section 3.3 below, it was also in processing time you are when time you are computing it, guided by a core set of principles that we believed our model how those answers relate to later observing the answers, and should embody: refinements), and also illustrates the power of reverting pre- Never rely on any notion of completeness. • vious values which otherwise might be left uncorrelated to the value offered as replacement. • Be flexible, to accommodate the diversity of known use PCollection> output = input cases, and those to come in the future. . withGapDuration (1, MINUTE)) .apply(Window.into( Sessions .trigger(SequenceOf( • Not only make sense, but also add value, in the context RepeatUntil( of each of the envisioned execution engines. AtPeriod(1, MINUTE), AtWatermark()), Repeat(AtWatermark()))) Encourage clarity of implementation. • ()) . accumulatingAndRetracting .apply(Sum.integersPerKey()); Support robust analysis of data in the context in which • they occurred. While the experiences below informed specific features of 1 12:09 12 12 -3 -3 8 the model, these principles informed the overall shape and 9 39 -25 -25 -5 -5 39 character of it, and we believe ultimately led to a more com- 12:08 3 3 prehensive and general result. -10 -10 25 25 -7 -7 3 8 12:07 10 10 3 3.3 Motivating Experiences 4 3 Processing Time As we designed the Dataflow Model, we took into consid- 12:06 5 7 7 5 7 eration our real-world experiences with FlumeJava and Mill- 5 Wheel over the years. Things which worked well, we made 12:03 12:04 12:05 12:07 12:08 12:01 12:06 12:02 sure to capture in the model; things which worked less well Event Time Actual watermark: motivated changes in approach. Here are brief summaries Ideal watermark: of some of these experiences that influenced our design. 3.3.1 Large Scale Backfills & The Lambda Figure 14: Sessions, Retracting Architecture: Unified Model In this example, we output initial singleton sessions for A number of teams run log joining pipelines on MillWheel. values 5 and 7 at the first one-minute processing-time bound- One particularly large log join pipeline runs in streaming ary. At the second minute boundary, we output a third ses- mode on MillWheel by default, but has a separate Flume- sion with value 10, built up from the values 3, 4, and 3. Java batch implementation used for large scale backfills. A When the value of 8 is finally observed, it joins the two ses- much nicer setup would be to have a single implementation sions with values 7 and 10. As the watermark passes the written in a unified model that could run in both stream- end of this new combined session, retractions for the 7 and ing and batch mode without modification. This became 10 sessions are emitted, as well as a normal datum for the the initial motivating use case for unification across batch, new session with value 25. Similarly, when the 9 arrives micro-batch, and streaming engines, and was highlighted in (late), it joins the session with value 5 to the session with Figures 10 12. − value 25. The repeated watermark trigger then immediately Another motivation for the unified model came from an emits retractions for the 5 and the 25, followed by a com- experience with the Lambda Architecture. Though most bined session of value 39. A similar dance occurs for the data processing use cases at Google are handled exclusively values 3, 8, and 1, ultimately ending with a retraction for by a batch or streaming system, one MillWheel customer ran an initial 3 session, followed by a combined session of 12. their streaming pipeline in weak consistency mode, with a nightly MapReduce to generate truth. They found that cus- 3. IMPLEMENTATION & DESIGN tomers stopped trusting the weakly consistent results over time, and as a result reimplemented their system around 3.1 Implementation strong consistency so they could provide reliable, low la- tency results. This experience further motivated the desire We have implemented this model internally in FlumeJava, to support fluid choice amongst execution engines. with MillWheel used as the underlying execution engine for streaming mode; additionally, an external reimplementation 3.3.2 Unaligned Windows: Sessions for Cloud Dataflow is largely complete at the time of writing. Due to prior characterization of those internal systems in the From the outset, we knew we needed to support sessions; literature, as well as Cloud Dataflow being publicly avail- this in fact is the main contribution of our windowing model able, details of the implementations themselves are elided over existing models. Sessions are an extremely important here for the sake of brevity. One interesting note is that the use case within Google (and were in fact one of the reasons core windowing and triggering code is quite general, and a MillWheel was created), and are used across a number of significant portion of it is shared across batch and stream- product areas, including search, ads, analytics, social, and ing implementations; that system itself is worthy of a more YouTube. Pretty much anyone that cares about correlating detailed analysis in future work. bursts of otherwise disjoint user activity over a period of 1801

11 time does so by calculating sessions. Thus, support for ses- rest of the data. This pipeline thus motivated inclusion of sions became paramount in our design. As shown in Figure processing-time triggers shown in Figures 7 and 8. 14, generating sessions in the Dataflow Model is trivial. 3.3.6 Anomaly Detection: Data-Driven & Composite Triggers 3.3.3 Billing: Triggers, Accumulation, & Retraction In the MillWheel paper, we described an anomaly de- Two teams with billing pipelines built on MillWheel ex- tection pipeline used to track trends in Google web search perienced issues that motivated parts of the model. Recom- queries. When developing triggers, their diff detection sys- mended practice at the time was to use the watermark as a tem motivated data-driven triggers. These differs observe completion metric, with ad hoc logic to deal with late data the stream of queries and calculate statistical estimates of or changes in source data. Lacking a principled system for whether a spike exists or not. When they believe a spike is updates and retractions, a team that processed resource uti- happening, they emit a start record, and when they believe lization statistics ended up leaving our platform to build a it has ceased, they emit a stop. Though you could drive custom solution (the model for which ended being quite sim- the differ output with something periodic like Trill’s punc- ilar to the one we developed concurrently). Another billing tuations, for anomaly detection you ideally want output as team had significant issues with watermark lags caused by soon as you are confident you have discovered an anomaly; stragglers in their input. These shortcomings became major the use of punctuations essentially transforms the stream- motivators in our design, and influenced the shift of focus ing system into micro-batch, introducing additional latency. from one of targeting completeness to one of adaptability While practical for a number of use cases, it ultimately is over time. The results were twofold: triggers, which allow not an ideal fit for this one, thus motivating support for the concise and flexible specification of when results are ma- custom data-driven triggers. It was also a motivating case terialized, as evidenced by the variety of output patterns for trigger composition, because in reality, the system runs − 14; and incre- possible over the same data set in Figures 7 multiple differs at once, multiplexing the output of them ac- mental processing support via accumulation (Figures 7 and cording to a well-defined set of logic. The AtCount trigger 8) and retractions (Figure 14). used in Figure 9 exemplified data-driven triggers; figures 10 − 14 utilized composite triggers. 3.3.4 Statistics Calculation: Watermark Triggers Many MillWheel pipelines calculate aggregate statistics (e.g. latency averages). For them, 100% accuracy is not 4. CONCLUSIONS required, but having a largely complete view of their data The future of data processing is unbounded data. Though in a reasonable amount of time is. Given the high level of bounded data will always have an important and useful accuracy we achieve with watermarks for structured input place, it is semantically subsumed by its unbounded counter- sources like log files, such customers find watermarks very part. Furthermore, the proliferation of unbounded data sets effective in triggering a single, highly-accurate aggregate per across modern business is staggering. At the same time, window. Watermark triggers are highlighted in Figure 12. consumers of processed data grow savvier by the day, de- A number of abuse detection pipelines run on MillWheel. manding powerful constructs like event-time ordering and Abuse detection is another example of a use case where pro- unaligned windows. The models and systems that exist to- cessing a majority of the data quickly is much more useful day serve as an excellent foundation on which to build the than processing 100% of the data more slowly. As such, data processing tools of tomorrow, but we firmly believe they are heavy users of MillWheel’s percentile watermarks, that a shift in overall mindset is necessary to enable those and were a strong motivating case for being able to support tools to comprehensively address the needs of consumers of percentile watermark triggers in the model. unbounded data. Relatedly, a pain point with batch processing jobs is strag- Based on our many years of experience with real-world, glers that create a long tail in execution time. While dy- massive-scale, unbounded data processing within Google, we namic rebalancing can help with this issue, FlumeJava has believe the model presented here is a good step in that direc- a custom feature that allows for early termination of a job tion. It supports the unaligned, event-time-ordered windows based on overall progress. One of the benefits of the unified modern data consumers require. It provides flexible trigger- model for batch mode is that this sort of early termination ing and integrated accumulation and retraction, refocusing criteria is now naturally expressible using the standard trig- the approach from one of finding completeness in data to gers mechanism, rather than requiring a custom feature. one of adapting to the ever present changes manifest in real- world datasets. It abstracts away the distinction of batch vs. 3.3.5 Recommendations: Processing Time Triggers micro-batch vs. streaming, allowing pipeline builders a more Another pipeline that we considered built trees of user ac- fluid choice between them, while shielding them from the tivity (essentially session trees) across a large Google prop- system-specific constructs that inevitably creep into models erty. These trees were then used to build recommendations targeted at a single underlying system. Its overall flexibility tailored to users’ interests. The pipeline was noteworthy in allows pipeline builders to appropriately balance the dimen- that it used processing-time timers to drive its output. This sions of correctness, latency, and cost to fit their use case, was due to the fact that, for their system, having regularly which is critical given the diversity of needs in existence. updated, partial views on the data was much more valuable And lastly, it clarifies pipeline implementations by separat- than waiting until mostly complete views were ready once ing the notions of what results are being computed, where the watermark passed the end of the session. It also meant in event time they are being computed, when in processing that lags in watermark progress due to a small amount time they are materialized, and how earlier results relate to of slow data would not affect timeliness of output for the later refinements. We hope others will find this model useful 1802

12 as we all continue to push forward the state of the art in this [16] J. Dean and S. Ghemawat. MapReduce: Simplified fascinating, remarkably complex field. Proc. of the Data Processing on Large Clusters. In Sixth Symposium on Operating System Design and , 2004. Implementation (OSDI) 5. ACKNOWLEDGMENTS [17] EsperTech. Esper. We thank all of our faithful reviewers for their dedica- http://www.espertech.com/esper/ , 2006. tion, time, and thoughtful comments: Atul Adya, Ben Birt, [18] Gates et al. Building a High-level Dataflow System on Ben Chambers, Cosmin Arad, Matt Austern, Lukasz Cwik, Proc. Top of Map-Reduce: The Pig Experience. Grzegorz Czajkowski, Walt Drummond, Jeff Gardner, An- VLDB Endow. , 2(2):1414–1425, Aug. 2009. thony Mancuso, Colin Meek, Daniel Myers, Sunil Pedapudi, [19] Google. Dataflow SDK. https://github.com/ Amy Unruh, and William Vambenepe. We also wish to rec- GoogleCloudPlatform/DataflowJavaSDK , 2015. ognize the impressive and tireless efforts of everyone on the [20] Google. Google Cloud Dataflow. Google Cloud Dataflow, FlumeJava, MillWheel, and related https://cloud.google.com/dataflow/ , 2015. teams that have helped bring this work to life. [21] T. Johnson et al. A Heartbeat Mechanism and its Proc. of the 31st Int. Application in Gigascope. In 6. REFERENCES Conf. on Very Large Data Bases (VLDB) , pages [1] D. J. Abadi et al. Aurora: A New Model and 1079–1088, 2005. The Architecture for Data Stream Management. [22] J. Li et al. Semantics and Evaluation Techniques for VLDB Journal , 12(2):120–139, Aug. 2003. Proceedings Window Aggregates in Data Streams. In [2] T. Akidau et al. MillWheel: Fault-Tolerant Stream og the ACM SIGMOD Int. Conf. on Management of Processing at Internet Scale. In Proc. of the 39th Int. Data (SIGMOD) , pages 311–322, 2005. Conf. on Very Large Data Bases (VLDB) , 2013. [23] J. Li et al. Out-of-order Processing: A New [3] A. Alexandrov et al. The Stratosphere Platform for Architecture for High-performance Stream Systems. , The VLDB Journal Big Data Analytics. , 1(1):274–288, Aug. 2008. Proc. VLDB Endow. 23(6):939–964, 2014. [24] D. Maier et al. Semantics of Data Streams and [4] Apache. Apache Hadoop. Operators. In Proc. of the 10th Int. Conf. on Database , 2012. http://hadoop.apache.org Theory (ICDT) , pages 37–52, 2005. [5] Apache. Apache Storm. [25] N. Marz. How to beat the CAP theorem. , 2013. http://storm.apache.org http://nathanmarz.com/blog/ [6] Apache. Apache Flink. how-to-beat-the-cap-theorem.html , 2011. http://flink.apache.org/ , 2014. [26] S. Murthy et al. Pulsar – Real-Time Analytics at [7] Apache. Apache Samza. Scale. Technical report, eBay, 2015. http://samza.apache.org , 2014. http://sqlstream.com/ , 2015. [27] SQLStream. [8] R. S. Barga et al. Consistent Streaming Through [28] U. Srivastava and J. Widom. Flexible Time Time: A Vision for Event Stream Processing. In Proc. Proc. of the Management in Data Stream Systems. In of the Third Biennial Conf. on Innovative Data 23rd ACM SIGMOD-SIGACT-SIGART Symp. on , pages 363–374, 2007. Systems Research (CIDR) , pages 263–274, 2004. Princ. of Database Systems [9] Botan et al. SECRET: A Model for Analysis of the [29] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, Execution Semantics of Stream Processing Systems. S. Anthony, H. Liu, P. Wyckoff, and R. Murthy. Hive: Proc. VLDB Endow. , 3(1-2):232–243, Sept. 2010. A Warehousing Solution over a Map-reduce [10] O. Boykin et al. Summingbird: A Framework for Framework. , 2(2):1626–1629, Proc. VLDB Endow. Integrating Batch and Online MapReduce Aug. 2009. Proc. VLDB Endow. Computations. , 7(13):1441–1451, [30] P. A. Tucker et al. Exploiting punctuation semantics Aug. 2014. in continuous data streams. IEEE Transactions on http://tigon.io/ [11] Cask. Tigon. , 2015. , 15, 2003. Knowledge and Data Engineering [12] C. Chambers et al. FlumeJava: Easy, Efficient [31] J. Whiteneck et al. Framing the Question: Detecting Data-Parallel Pipelines. In Proc. of the 2010 ACM Proc. of and Filling Spatial- Temporal Windows. In SIGPLAN Conf. on Programming Language Design the ACM SIGSPATIAL Int. Workshop on , pages 363–375, 2010. and Implementation (PLDI) GeoStreaming (IWGS) , 2010. [13] B. Chandramouli et al. Trill: A High-Performance [32] F. Yang and others. Sonora: A Platform for Incremental Query Processor for Diverse Analytics. In Continuous Mobile-Cloud Computing. Technical Proc. of the 41st Int. Conf. on Very Large Data Bases Report MSR-TR-2012-34, Microsoft Research Asia. , 2015. (VLDB) [33] M. Zaharia et al. Resilient Distributed Datasets: A [14] S. Chandrasekaran et al. TelegraphCQ: Continuous Fault-Tolerant Abstraction for In-Memory Cluster Proc. of the 2003 ACM Dataflow Processing. In Proc. of the 9th USENIX Conf. on Computing. In SIGMOD Int. Conf. on Management of Data Networked Systems Design and Implementation (SIGMOD) , SIGMOD ’03, pages 668–668, New York, , pages 15–28, 2012. (NSDI) NY, USA, 2003. ACM. [34] M. Zaharia et al. Discretized Streams: Fault-Tolerant [15] J. Chen et al. NiagaraCQ: A Scalable Continuous Proc. of the 24th Streaming Computation at Scale. In Proc. of the Query System for Internet Databases. In , 2013. ACM Symp. on Operating Systems Principles 2000 ACM SIGMOD Int. Conf. on Management of Data (SIGMOD) , pages 379–390, 2000. 1803

Related documents