Big Data Processing (and Friends)

1 Big Data Processing (and Friends)Peter Bailis Stanford ...
Author: Reynard Maxwell
0 downloads 3 Views

1 Big Data Processing (and Friends)Peter Bailis Stanford CS245 (with slides from Matei Zaharia + Mu Li) CS 245 Notes 12

2 Previous Outline Replication Strategies Partitioning StrategiesAC & 2PC CAP Why is coordination hard? NoSQL CS 245 Notes 12

3 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down CS 245 Notes 12

4

5

6 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down Amazon’s Dynamo was among the first Open source examples: MongoDB, Cassandra, Redis CS 245 Notes 10

7 Example API: MongoDB

8 “NoSQL” Popular set of databases, largely built by web companies in the 2000s Focus on scale-out and flexible schemas Lots of hype, somewhat dying down Amazon’s Dynamo was among the first Open source examples: MongoDB, Cassandra, Redis Newer: “NewSQL” – next-generation, with txns, sometimes SQL! Spanner, CockroachDB, MemSQL CS 245 Notes 10

9 What couldn’t RDBMSs do well?Schema changes were (are?) a pain Hard to add new columns, critical when building new applications quickly Auto-partition and re-partition (”shard”) Gracefully fail-over during failures Multi-partition operations CS 245 Notes 10

10 How much of “NoSQL” et al. is new?Basic algorithms for scale-out execution were known in 1980s Google’s Spanner: core algorithms published in 1993 Reality: takes a lot of engineering to get right! (web & cloud drove demand) Hint: adding distribution is much harder than building from the ground up! CS 245 Notes 10

11 How much of “NoSQL” et al. is new?Semi-structured data management is hugely useful for developers Web and open source: shift from “DBA-first” to “developer-first” mentality Not always a good thing for a mature products or services needing stability! Have less info for query optimization, but… people cost more than compute! CS 245 Notes 10

12 Lessons from “NoSQL” Scale drove 2000s technology demandsOpen source enabled adoption of less mature technology, experimentation Developers, not DBAs (“DevOps”) Exciting time for data infrastructure CS 245 Notes 10

13 Today’s Outline NoSQL overview Cloud Landscape System in focus: SparkScale-out ML systems

14 Key Technology: The WebApplication pull: how to make sense of the ‘net? Hardware push: commodity clusters

15 Berkeley Network of Workstations project (‘95)Led to Inktomi (last lecture!) Old CW: use mainframes // New CW: cheap, commodity storage!

16

17 In Huang!

18 Jeff Dean

19

20

21 Google File System Big IdeasStore big chunks of data on a big, distributed cluster Sounds like a database…? Bedrock of Google’s entire data infrastructure Can build a number of higher-level storage engines on top… …in addition to compute engines…

22

23 Example: word count!

24

25 Key MapReduce Ideas Express parallel computation using free functional transformations Can execute map, reduce in parallel Side-effect free? Can restart jobs in event of failure Fault tolerant Writes intermediate data to disk Node failure? Recompute from upstream No SQL, no planner, no optimizer User specifies number of “workers”

26 PostgreSQL example UDF (in C)Datum concat_text(PG_FUNCTION_ARGS) { text *arg1 = PG_GETARG_TEXT_P(0); text *arg2 = PG_GETARG_TEXT_P(1); int32 new_text_size = VARSIZE(arg1) + VARSIZE(arg2) - VARHDRSZ; text *new_text = (text *) palloc(new_text_size); SET_VARSIZE(new_text, new_text_size); memcpy(VARDATA(new_text), VARDATA(arg1), VARSIZE(arg1) - VARHDRSZ); memcpy(VARDATA(new_text) + (VARSIZE(arg1) - VARHDRSZ), VARDATA(arg2), VARSIZE(arg2) - VARHDRSZ); PG_RETURN_TEXT_P(new_text); } https://www.postgresql.org/docs/9.1/static/xfunc-c.html

27 Was MapReduce New?

28

29 Was MapReduce New? Stonebraker and Dewitt: No!Isn’t very flexible; user codes entire query plan Doesn’t use indexes Techniques known for decades Kind of dumb: writes intermediate data to disk

30

31 Was MapReduce New? Reality Somewhere in-betweenIdeas not necessarily new… Dataflow: old idea Map and Reduce: about as old …but where is fault-tolerant system that can index the internet? Dean and Ghemawat just claim it’s a “useful tool!” …and what do programmers prefer to use?

32 Was MapReduce useful? Yes!2006: Team at Yahoo! creates Hadoop, open source GFS+MapReduce 2008: Hadoop runs on 4000 nodes 2009: Hadoop sorts a petabyte of data in < 17 hours 2011: Hadoop v1 released…

33 Hadoop Ecosystem Around mid-2000s, open source explodedBuild versus buy? Many web companies, startups adopted/adapted open source Yahoo!, Facebook, Twitter release, contribute back to open source Apache Software Foundation becomes “home” for Hadoop ecosystem Simultaneously: Cloud infrastructure (e.g., AWS) means easier than ever to get cluster Can scale on-demand

34 Late 2000s: Continued Evolution, Pain PointsStorage in HDFS Problem: raw files waste space, are inefficient Solution: impose flexible schemas (see: Parquet, RCFile) Faster serving from HDFS Problem: flat files are slow to serve Solution: HBase, open source clone of another Google project, called BigTable Hadoop is batch-oriented Problem: want faster execution Solution: streaming dataflow engines like Storm Hadoop is slow and has awkward APIs Problem: intermediate materialization is slow, APIs are clunky Solution: new interface; Apache Spark!

35 Today’s Outline NoSQL overview Cloud Landscape System in focus: SparkScale-out ML systems

36 Matei Zaharia (cool fact: he’s on our faculty!)

37 Original Spark Vision 1) Unified engine for big data processingCombines batch, interactive, iterative, streaming 2) Concise, language-integrated API Functional programming in Scala/Java/Python

38 Motivation: UnificationPregel Dremel Presto Storm Giraph Drill Impala S4 . . . Specialized systems for new workloads MapReduce General batch processing Hard to compose in pipelines Hard to manage, tune, deploy

39 Motivation: UnificationPregel ? Giraph Dremel Drill MapReduce Impala Presto Storm S4 . . . General batch processing Specialized systems for new workloads Unified engine

40 Motivation: Concise APIMuch of data analysis is exploratory / interactive Spark solution: Resilient Distributed Datasets (RDDs) “Distributed collection” abstraction with simple functional API lines = spark.textFile(“hdfs://...”) // RDD[String] points = lines.map(line => parsePoint(line)) // RDD[Point] points.filter(p => p.x > 100).count()

41 Implementation idea Execution similar to Hadoop: distribute to cluster Store intermediate data in memory Recover any failed partitions by re-running functional tasks (Trade-off with Hadoop/MapReduce?)

42 Hadoop poor fit for iterative ML

43 How Did the Vision Hold Up?Generally well! Users really appreciate unification Functional API causes some challenges, work in progress

44 Libraries Built on SparkSQL Streaming MLlib Spark Core (RDDs) GraphX Largest integrated standard library for big data

45 Which Libraries Do People Use?75% of users use more than one component

46 Top Applications What was before?

47 Main Challenge: Functional APILooks high-level, but hides many semantics of computation Functions are arbitrary blocks of Java bytecode Data stored is arbitrary Java objects Users can mix APIs in suboptimal ways

48 Example Problem Materializes all groups as lists of integerspairs = data.map(word => (word, 1)) groups = pairs.groupByKey() groups.map((k, vs) => (k, vs.sum)) Materializes all groups as lists of integers Then promptly aggregates them Mention space usage of key-value pairs

49 Challenge: Data RepresentationJava objects often many times larger than underlying fields class User(name: String, friends: Array[Int]) User(“Bobby”, Array(1, 2))

50 DataFrame API DataFrames hold rows with a known schema and offer relational operations on them through a DSL c = HiveContext() users = c.sql(“select * from users”) ma_users = users[users.state == “MA”] ma_users.count() ma_users.groupBy(“name”).avg(“age”) ma_users.map(lambda row: row.user.toUpper()) Expression AST

51 Execution Steps

52 API Details Based on data frame concept in R, PythonSpark is the first to make this a declarative API Integrated with the rest of Spark ML library takes DataFrames as input & output Easily convert RDDs ↔ DataFrames Google trends for “data frame”

53 What DataFrames EnableCompact binary representation Columnar, compressed format for caching; rows for processing Optimization across operators (join reordering, pushdown, etc) Runtime code generation

54 Performance

55 Performance

56 Data Sources Now that we have an API for structured data, map it to data stores Spark apps should be able to migrate across Hive, Cassandra, JSON, … Rich semantics of API allows query pushdown into data sources, something not possible with original Spark users[users.age > 20] select * from users

57 Data Source API All data sources provide a schema given a connection string (e.g. JSON file, Hive table name) Different interfaces for “smarter” federation Table scan: just read all rows Pruned scan: read specific columns Filtered scan: read rows matching an expression → CSV, JSON → Cassandra, HBase → JDBC, Parquet, Hive

58 select id, age from users where lang=“en”Examples { “text”: “hi”, “user”: { “name”: “bob”, “id”: 15 } } JSON: JDBC: Together: select user.id, text from tweets tweets.json select age from users where lang = “en” Spark SQL {JSON} select id, age from users where lang=“en” select t.text, u.age from tweets t, users u where t.user.id = u.id and u.lang = “en”

59 Hardware Trends Storage Network CPU

60 Hardware Trends 2010 2015 Storage 50+MB/s (HDD) 500+MB/s (SSD) Network1Gbps 10Gbps CPU ~3GHz

61 Hardware Trends 2010 2015 Storage 50+MB/s (HDD) 500+MB/s (SSD) 10XNetwork 1Gbps 10Gbps CPU ~3GHz

62 Project Tungsten Substantially speed up Spark by optimizing CPU efficiency, via: Runtime code generation Exploiting cache locality Off-heap memory management

63 Tungsten’s Compact Encoding(123, “data”, “bricks”)

64 Runtime Code GenerationDataFrame Code / SQL df.where(df("year") > 2015) Catalyst Expressions GreaterThan(year#234, Literal(2015)) bool filter(Object baseObject) { int offset = baseOffset + bitSetWidthInBytes + 3*8L; int value = Platform.getInt(baseObject, offset); return value34 > 2015; } Low-level bytecode Platform.getInt(baseObject, offset); JVM intrinsic JIT-ed to pointer arithmetic

65 First stage out in Spark 1.5Long-Term Vision language frontend Tungsten backend First stage out in Spark 1.5

66 Big Data in Production Big data is moving from offline analytics to production use Incorporate new data in seconds (streaming) Power low-latency queries (data serving) Currently very hard to build: separate streaming, serving & batch systems Our goal: one engine for “continuous apps”

67 PB’s Punchlines Spark is de facto batch analytics processor todayStreaming: just run min—batches… Looks a lot like SQL data warehouse… …but can do a bunch more, too: ML, etc. Maybe the biggest lesson: Building modular software enables modular usages Compare: traditional data warehouses Still slower than fast data warehouse, but more flexible! Humans win over hardware efficiency (for many cases)!

68 Today’s Outline NoSQL overview Cloud Landscape System in focus: Spark Scale-out ML systems

69

70 Idea: For “big models,” partition data and parametersCalled a “parameter server” Asynchronous training can help! New systems like TensorFlow combine this idea with dataflow

71 slides via Mu Li

72

73

74

75

76

77

78

79

80

81 Costs/Benefits compared to Dataflow?

82 Costs/Benefits compared to Dataflow?Pro: Efficient; only access model parameters you need Con: No real “query optimization” Pro or Con?: Fault tolerance Pro: Allows asynchronous execution

83

84

85 Bonus from last time: Does machine learning always need serializability? e.g., say I want to train a deep network on 1000s of GPUs CS 245 Notes 12

86 Bonus from last time: Does machine learning always need serializability? No! Turns out asynchronous execution is provably safe (for sufficiently small delays) Convex optimization routines (e.g., SGD) run faster on modern HW without locks Best paper name ever: HogWild! CS 245 Notes 12

87

88

89 Punchlines Parameter server architecture is useful for training large models Increasingly popular in “deep networks” Lots of noise about new systems (Graphs, Deep Learning) Often need special adaptation for workloads (e.g., special joins, operators) But basic computational patterns (dataflow with some shared state) same Asynchrony can help training time in distributed environment Is training all we care about?

90

91 Next generation of systems:Post-database data management!!!