1 Big Data Processing (and Friends)Peter Bailis Stanford CS245 (with slides from Matei Zaharia + Mu Li) CS 245 Notes 12
2 Previous Outline Replication Strategies Partitioning StrategiesAC & 2PC CAP Why is coordination hard? NoSQL CS 245 Notes 12
3 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down CS 245 Notes 12
4
5
6 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down Amazon’s Dynamo was among the first Open source examples: MongoDB, Cassandra, Redis CS 245 Notes 10
7 Example API: MongoDB
8 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down Amazon’s Dynamo was among the first Open source examples: MongoDB, Cassandra, Redis Newer: “NewSQL” – next-generation, with txns, sometimes SQL! Spanner, CockroachDB, MemSQL CS 245 Notes 10
9 What couldn’t RDBMSs do well?Schema changes were (are?) a pain Hard to add new columns, critical when building new applications quickly Auto-partition and re-partition (”shard”) Gracefully fail-over during failures Multi-partition operations CS 245 Notes 10
10 How much of “NoSQL” et al. is new?Basic algorithms for scale-out execution were known in 1980s Google’s Spanner: core algorithms published in 1993 Reality: takes a lot of engineering to get right! (web & cloud drove demand) Hint: adding distribution is much harder than building from the ground up! CS 245 Notes 10
11 How much of “NoSQL” et al. is new?Semi-structured data management is hugely useful for developers Web and open source: shift from “DBA-first” to “developer-first” mentality Not always a good thing for a mature products or services needing stability! Have less info for query optimization, but… people cost more than compute! CS 245 Notes 10
12 Lessons from “NoSQL” Scale drove 2000s technology demandsOpen source enabled adoption of less mature technology, experimentation Developers, not DBAs (“DevOps”) Exciting time for data infrastructure CS 245 Notes 10
13 Today’s Outline NoSQL overview Cloud Landscape System in focus: SparkScale-out ML systems
14 Key Technology: The WebApplication pull: how to make sense of the ‘net? Hardware push: commodity clusters
15 Berkeley Network of Workstations project (‘95)Led to Inktomi (last lecture!) Old CW: use mainframes // New CW: cheap, commodity storage!
16
17 In Huang!
18 Jeff Dean
19
20
21 Google File System Big IdeasStore big chunks of data on a big, distributed cluster Sounds like a database…? Bedrock of Google’s entire data infrastructure Can build a number of higher-level storage engines on top… …in addition to compute engines…
22
23 Example: word count!
24
25 Key MapReduce Ideas Express parallel computation using free functional transformations Can execute map, reduce in parallel Side-effect free? Can restart jobs in event of failure Fault tolerant Writes intermediate data to disk Node failure? Recompute from upstream No SQL, no planner, no optimizer User specifies number of “workers”
26 PostgreSQL example UDF (in C)Datum concat_text(PG_FUNCTION_ARGS) { text *arg1 = PG_GETARG_TEXT_P(0); text *arg2 = PG_GETARG_TEXT_P(1); int32 new_text_size = VARSIZE(arg1) + VARSIZE(arg2) - VARHDRSZ; text *new_text = (text *) palloc(new_text_size); SET_VARSIZE(new_text, new_text_size); memcpy(VARDATA(new_text), VARDATA(arg1), VARSIZE(arg1) - VARHDRSZ); memcpy(VARDATA(new_text) + (VARSIZE(arg1) - VARHDRSZ), VARDATA(arg2), VARSIZE(arg2) - VARHDRSZ); PG_RETURN_TEXT_P(new_text); } https://www.postgresql.org/docs/9.1/static/xfunc-c.html
27 Was MapReduce New?
28
29 Was MapReduce New? Stonebraker and Dewitt: No!Isn’t very flexible; user codes entire query plan Doesn’t use indexes Techniques known for decades Kind of dumb: writes intermediate data to disk
30
31 Was MapReduce New? Reality Somewhere in-betweenIdeas not necessarily new… Dataflow: old idea Map and Reduce: about as old …but where is fault-tolerant system that can index the internet? Dean and Ghemawat just claim it’s a “useful tool!” …and what do programmers prefer to use?
32 Was MapReduce useful? Yes!2006: Team at Yahoo! creates Hadoop, open source GFS+MapReduce 2008: Hadoop runs on 4000 nodes 2009: Hadoop sorts a petabyte of data in < 17 hours 2011: Hadoop v1 released…
33 Hadoop Ecosystem Around mid-2000s, open source explodedBuild versus buy? Many web companies, startups adopted/adapted open source Yahoo!, Facebook, Twitter release, contribute back to open source Apache Software Foundation becomes “home” for Hadoop ecosystem Simultaneously: Cloud infrastructure (e.g., AWS) means easier than ever to get cluster Can scale on-demand
34 Late 2000s: Continued Evolution, Pain PointsStorage in HDFS Problem: raw files waste space, are inefficient Solution: impose flexible schemas (see: Parquet, RCFile) Faster serving from HDFS Problem: flat files are slow to serve Solution: HBase, open source clone of another Google project, called BigTable Hadoop is batch-oriented Problem: want faster execution Solution: streaming dataflow engines like Storm Hadoop is slow and has awkward APIs Problem: intermediate materialization is slow, APIs are clunky Solution: new interface; Apache Spark!
35 Today’s Outline NoSQL overview Cloud Landscape System in focus: SparkScale-out ML systems
36 Matei Zaharia (cool fact: he’s on our faculty!)
37 Original Spark Vision 1) Unified engine for big data processingCombines batch, interactive, iterative, streaming 2) Concise, language-integrated API Functional programming in Scala/Java/Python
38 Motivation: UnificationPregel Dremel Presto Storm Giraph Drill Impala S4 . . . Specialized systems for new workloads MapReduce General batch processing Hard to compose in pipelines Hard to manage, tune, deploy
39 Motivation: UnificationPregel ? Giraph Dremel Drill MapReduce Impala Presto Storm S4 . . . General batch processing Specialized systems for new workloads Unified engine
40 Motivation: Concise APIMuch of data analysis is exploratory / interactive Spark solution: Resilient Distributed Datasets (RDDs) “Distributed collection” abstraction with simple functional API lines = spark.textFile(“hdfs://...”) // RDD[String] points = lines.map(line => parsePoint(line)) // RDD[Point] points.filter(p => p.x > 100).count()
41 Implementation idea Execution similar to Hadoop: distribute to cluster Store intermediate data in memory Recover any failed partitions by re-running functional tasks (Trade-off with Hadoop/MapReduce?)
42 Hadoop poor fit for iterative ML
43 How Did the Vision Hold Up?Generally well! Users really appreciate unification Functional API causes some challenges, work in progress
44 Libraries Built on SparkSQL Streaming MLlib Spark Core (RDDs) GraphX Largest integrated standard library for big data
45 Which Libraries Do People Use?75% of users use more than one component
46 Top Applications What was before?
47 Main Challenge: Functional APILooks high-level, but hides many semantics of computation Functions are arbitrary blocks of Java bytecode Data stored is arbitrary Java objects Users can mix APIs in suboptimal ways
48 Example Problem Materializes all groups as lists of integerspairs = data.map(word => (word, 1)) groups = pairs.groupByKey() groups.map((k, vs) => (k, vs.sum)) Materializes all groups as lists of integers Then promptly aggregates them Mention space usage of key-value pairs
49 Challenge: Data RepresentationJava objects often many times larger than underlying fields class User(name: String, friends: Array[Int]) User(“Bobby”, Array(1, 2))
50 DataFrame API DataFrames hold rows with a known schema and offer relational operations on them through a DSL c = HiveContext() users = c.sql(“select * from users”) ma_users = users[users.state == “MA”] ma_users.count() ma_users.groupBy(“name”).avg(“age”) ma_users.map(lambda row: row.user.toUpper()) Expression AST
51 Execution Steps
52 API Details Based on data frame concept in R, PythonSpark is the first to make this a declarative API Integrated with the rest of Spark ML library takes DataFrames as input & output Easily convert RDDs ↔ DataFrames Google trends for “data frame”
53 What DataFrames EnableCompact binary representation Columnar, compressed format for caching; rows for processing Optimization across operators (join reordering, pushdown, etc) Runtime code generation
54 Performance
55 Performance
56 Data Sources Now that we have an API for structured data, map it to data stores Spark apps should be able to migrate across Hive, Cassandra, JSON, … Rich semantics of API allows query pushdown into data sources, something not possible with original Spark users[users.age > 20] select * from users
57 Data Source API All data sources provide a schema given a connection string (e.g. JSON file, Hive table name) Different interfaces for “smarter” federation Table scan: just read all rows Pruned scan: read specific columns Filtered scan: read rows matching an expression → CSV, JSON → Cassandra, HBase → JDBC, Parquet, Hive
58 select id, age from users where lang=“en”Examples { “text”: “hi”, “user”: { “name”: “bob”, “id”: 15 } } JSON: JDBC: Together: select user.id, text from tweets tweets.json select age from users where lang = “en” Spark SQL {JSON} select id, age from users where lang=“en” select t.text, u.age from tweets t, users u where t.user.id = u.id and u.lang = “en”
59 Hardware Trends Storage Network CPU
60 Hardware Trends 2010 2015 Storage 50+MB/s (HDD) 500+MB/s (SSD) Network1Gbps 10Gbps CPU ~3GHz
61 Hardware Trends 2010 2015 Storage 50+MB/s (HDD) 500+MB/s (SSD) 10XNetwork 1Gbps 10Gbps CPU ~3GHz
62 Project Tungsten Substantially speed up Spark by optimizing CPU efficiency, via: Runtime code generation Exploiting cache locality Off-heap memory management
63 Tungsten’s Compact Encoding(123, “data”, “bricks”)
64 Runtime Code GenerationDataFrame Code / SQL df.where(df("year") > 2015) Catalyst Expressions GreaterThan(year#234, Literal(2015)) bool filter(Object baseObject) { int offset = baseOffset + bitSetWidthInBytes + 3*8L; int value = Platform.getInt(baseObject, offset); return value34 > 2015; } Low-level bytecode Platform.getInt(baseObject, offset); JVM intrinsic JIT-ed to pointer arithmetic
65 First stage out in Spark 1.5Long-Term Vision language frontend Tungsten backend First stage out in Spark 1.5
66 Big Data in Production Big data is moving from offline analytics to production use Incorporate new data in seconds (streaming) Power low-latency queries (data serving) Currently very hard to build: separate streaming, serving & batch systems Our goal: one engine for “continuous apps”
67 PB’s Punchlines Spark is de facto batch analytics processor todayStreaming: just run min—batches… Looks a lot like SQL data warehouse… …but can do a bunch more, too: ML, etc. Maybe the biggest lesson: Building modular software enables modular usages Compare: traditional data warehouses Still slower than fast data warehouse, but more flexible! Humans win over hardware efficiency (for many cases)!
68 Today’s Outline NoSQL overview Cloud Landscape System in focus: Spark Scale-out ML systems
69
70 Idea: For “big models,” partition data and parametersCalled a “parameter server” Asynchronous training can help! New systems like TensorFlow combine this idea with dataflow
71 slides via Mu Li
72
73
74
75
76
77
78
79
80
81 Costs/Benefits compared to Dataflow?
82 Costs/Benefits compared to Dataflow?Pro: Efficient; only access model parameters you need Con: No real “query optimization” Pro or Con?: Fault tolerance Pro: Allows asynchronous execution
83
84
85 Bonus from last time: Does machine learning always need serializability? e.g., say I want to train a deep network on 1000s of GPUs CS 245 Notes 12
86 Bonus from last time: Does machine learning always need serializability? No! Turns out asynchronous execution is provably safe (for sufficiently small delays) Convex optimization routines (e.g., SGD) run faster on modern HW without locks Best paper name ever: HogWild! CS 245 Notes 12
87
88
89 Punchlines Parameter server architecture is useful for training large models Increasingly popular in “deep networks” Lots of noise about new systems (Graphs, Deep Learning) Often need special adaptation for workloads (e.g., special joins, operators) But basic computational patterns (dataflow with some shared state) same Asynchrony can help training time in distributed environment Is training all we care about?
90
91 Next generation of systems:Post-database data management!!!