All about BIG data

Big data refers to data that is so large, fast or complex that it’s difficult or impossible to process using traditional methods. The act of accessing and storing large amounts of information for analytics has been around for a long time.

The Hadoop architecture is a package of the file system, MapReduce engine and the HDFS (Hadoop Distributed File System). This ecosystem is used in many flavors / variances to help process big data. Today we will go over all the components, alternatives which build this solution.

HDFS – Hadoop Distributed File System

This is the file storage system, and as the name applies, it saves the file by chopping it into manageable blocks and saving them across multiple servers.

There are 3 important things here

1.    Name node stores information which portion of the file is stored on which data node, It can have a secondary backup name node.

2.    The Data nodes store the contents

3.    The file is backed up in a way that the blocks are optimally replicated on more than 1 node, so if a node goes down, the data is still preserved.

Map-Reduce

It is the core processing framework which is part of big data architecture. It divides the processing across the cluster, the processing is monitored and is resilient to failures.

Mapping means transformation, Reducing means aggregation.

General example is if you want to do a word count in the whole file, then mapper will first split the lines in words, it will then transform the word into a tuple (word,1). All this will generally happen on the machine on which data is stored to avoid network load. Next comes the shuffle and sort which aggregates the outputs of mappers to reducers so same key lands on same reducer from all mappers. The last part is reducer aggregates the data and gives out a count of each word by summing up the tuples.

This is now falling out of favor now because there is a inherent complexity in thinking about dividing a task into Map stage and a Reduce stage. Also with introduction of more powerful concepts of DAGs (Directed acyclic graphs) which are used by Spark, Tez and hidden behind the scenes, people do not write direct map reduce anymore.

HBase – Big Database

HBase is derived from Big table paper from Google in early 2000s https://research.google/pubs/pub27898/ . It is used when we need random, realtime access to read/write data. In fact all NoSQL databases are derived from the same concept.

HBase is essentially a mechanism of partitioning the data by a given key, keeping a track of which key is stored on which server on the cluster, and providing CRUD APIs for fast read / write on the data.

Each row is defined by a unique Key, Each row has some column families, and each family will have some random number of columns. So there may be lot of columns in a family but most of them may be empty. This is called sparse data and is the main feature of the big table.

Hbase also keeps versions of data in each cell with its timestamp.

All CRUD operations are based on partition keys, otherwise client has to go into a full scan similar to map reduce job.

NoSQL

Provides random access (with Key) to Petabyte size data. It approaches the problem by partitioning the data based on the key and providing APIs to do CRUD on the data element. RDBMS is highly normalized, this is more denormalized and keeps all data which is needed for a usecase at 1 single place. So one fetch will get all the data you will need in a simple, fast and scalable way. Sometimes we may need to build separate index tables to derive the key for another table, or write same data at 2 places for faster access.

Beyond the Core components

The three components above are the core components for a big data platform. They have been optimized to some extent by providers like AWS S3 is substitute for HDFS, for Hbase there are many providers which we will discuss below like Cassandra, MongoDB, DynamoDB.

Then there are tools which are built around these which fall into 2 categories

Platform tools – The ones which keep the system running, manage the cluster, help in execution

Execution tools – The ones which help in data processing (adhoc, Batch, streaming etc)

Apache Yarn – Yet Another Resource Negotiator

Originally this code was part of Map reduce library, but in Hadoop 2 it was pulled out to separate out the problem of managing resources vs execution tasks. This then helped evolution of Spark, Tez to utilize it to build DAGs independent of Map reduce.

YARN talks to Application Master to identify the topology and utilization of the resources. It is also efficient in identifying data locality of a process to avoid network load during processing.

Application talks to YARN to distribute work across cluster. It supports FIFO, Capacity based or Optimized processing.

Apache Tez

This is an alternate to Map reduce, and many applications (Pig, Hive) can choose between MR and Tez to execute their jobs.

DAGs optimize the processing by reducing the number of time the data is fetched in MR job. It builds a topology that does all the Map jobs first, and chain the reduce jobs to avoid going through another round of disk reads and relying on in memory data fetched initially and in parallel. Under the hood it still uses YARN for resource management.

ZooKeeper

It keeps a track of the cluster infrastructure, like which node is a master, backup master, worker, who is currently processing which job. It acts as a single source of truth for your cluster. It should be itself running in a cluster (ensemble) with a minimum of 5 nodes and a truth consistency of 3 nodes to avoid bad reads and have a resiliency in case more than 1 node goes down.

Internally it maintains a hierarchal file system (znodes) and maintains a dictionary of all this data.

The interesting way it keeps track of liveliness of a server is by session management, so the entry in its dictionary is valid as long as the session is active with that server. As soon as the session drops the infrastructure is deemed non operational and if needed a new master election process starts.

Mesos

Mesos originated out of UC Berkley, Twitter, Airbnb and is a system which manages resources and jobs across cluster. Its not part of original Hadoop system, and is more of a task and resource orchestrator with container support, so it is more closer in design to Kubernetese rather than YARN as YARN is internally integrated by integrating technology and not much exposed to user, But Mesos is more configurable, has its own UI and goes beyond Hadoop.

OOZIE

It is used for running and scheduling Hadoop tasks. It supports multi step jobs for chaining together Hive, Pig, Map reduce tasks. It can also run spark jobs.

The workflow is defined using a XML syntax and supports parallel processing of tasks. The only restriction in the workflow is that every stage should be independent. It relies on a properties file which gives the cluster information.

Ozzie comes with a UI where users can keep track of running and completed jobs.

Oozie supports coordinators which can launch a workflow at a given time / schedule.

Apache Ambari

A Web based visualizer which provides a dashboard of the cluster, heath, jobs, alarms.

We can use it to manage cluster, resize nodes, update alarm thresholds, install or restart services.

We can also use it to execute Pig or Hive queries, look up data in hdfs and some more with help of plugins.

Cloudera Hue

Hue is an alternative to Ambari, its made by Cloudera for their flavor of Hadoop and is a bit more polished.

Query Engines

Apache Hive

Hive provides a SQL (HiveQL) interface to Map Reduce or Tez. It provides a interactive framework via Ambari, and is extensible by providing support for plugging in mappers, user-defined functions.

It has high latency as with Map reduce. Its SQL is somewhat limited in comparison to Pig or SparkSQL. And does not write back to the file system at record level.

When we add data to HDFS, we can tell it to load with Schema on Read, so it pre structures the hdfs data into a given schema. Underlying file structure does not change, but Hive adds a metastore to map the association.

It has a jdbc / odbc driver to use hive programmatically.

Apache Pig

Its another way to avoid Map reduce and process the data with SQL type language. Pig Latin has functions to Load, Store, Dump, Filter, Join, Order

It operates on client side after fetching hdfs data unlike hive. Does not support jdbc and can be used for programming.

Its more of a separate module over Hadoop rather than tightly coupled like hive which can alter the storage structure of Hadoop. It was developed by Yahoo.

Apache Drill

Is a full SQL query engine based on Google Dremel which can query Hive, Hbase, MongoDB, Json etc. It supports jdbc / odbc driver.

It is used to query across disjoint systems in case you want to join some data from Hbase and Hive or mongoDB

Apache Phoenix

It is a SQL interface for NoSQL DB HBase developed by SalesForce. The reason why all these tools are trying to mimic SQL is because it is very straightforward and hides the complexity of noSQL get data. But that is not supposed be a problem if the data is organized correctly in a NOSQL architecture. We should not be using a BigData table like a RDBMS in the first place. Now we know which organizations are developing these tools are not organizing their data properly.

It has integration with Spark, Hive, Pig and Flume. If you need only Hbase data then you can just use Phoenix rather than Drill.

It also provides a API interface for java.

Presto

Again another SQL interface like Drill, developed by Facebook. It integrates with Cassandra beyond Drill. Cassandra is of-course another product from Facebook itself so no surprise here.

It is being used at facebook so it is supposed to be reliable.

Apache Zeppelin

This is very similar to Google colabs / Jupyter Notebook, a notebook which can be used to write code on a browser, see the execution results, document the code for sharing the examples, and has several data visualization tools. It has many plugins to execute code on Pig, Hive, MR, Spark and visualize results.

Big Table solutions

Cassandra

Cassandra is a NoSQL DB similar to Hbase. The difference is how it manages the infrastructure for availability. It does not have a single master node which keeps the knowledge of which key resides where. Cassandra relies on gossip protocol to share the information between nodes as to who is responsible for which data and its replication.

Cassandra gives up on consistency (is eventually consistent) in favor of Availability.

Cassandra supports CQL which mimics SQL, gives a prompt to lookup data and helps creates some tables. It is rather limited but given the nature of Bigdata it is sufficient (more than enough) for the need.

It supports direct replication to another cluster which can be used for analytics, It might be a bit controversial on the need as big data is hosted on a cluster with redundancy and only 1 copy should be sufficient for entire world, it should be achievable by increasing cluster size.

One important thing about Cassandra is that is it very well supported by Spark. DataStax has built a connector to it to read / write tables as DataFrames. In ideal topology, you will have Cassandra and Spark running on same nodes in the cluster. That will provide data locality to your spark cluster, avoid network traffic and enhance the efficiency of your solution.

Mongo DB

It also has a single master as Hbase as it emphasizes on consistency rather than availability, so it keeps only one copy of the keys dictionary. It can replicate the master to another node for high availability use cases.

Internally it stores all data as a document based data model which is a json. It assigns a Unique Key to the data by itself while saving the data. The document structure itself is not constrained and you /can/ enforce a schema if you want, or every document can have different fields.

You can create an index on a field if you want to shard it by that field.

One benefit of MongoDB is that we can run map reduce on it directly like hadoop.

CAP considerations

All databases balance between Availability, consistency and partition tolerance

Availability – do we have only one master, what happens if it goes down, how soon can we recover

Consistency – Do I end up geting different answers if I query different nodes. It is a problem for distributed environments since write happens only on one node, and is transmitted to other nodes having same data. One way the systems mitigate it is to configure when you will get return handle back, after first write or after all the replication. Redis further helps by its single thread model.

Partition tolerance – Do you need to keep data on more than 1 server. If not then you can avoid problems by just selecting a RDBMS.

Data Processing

Apache Spark

Is a very mature technology for large scale data processing. Spark is built in Scala and is the preferred language of coding in Spark. Spark is a memory based processing solution which leads it to be faster than disk based ones.

It contains a driver program which is responsible to distribute tasks through Yarn or its inbuilt cluster manager. The executors are responsible for processing the tasks on individual nodes

It uses Directed Acyclic Graphs. The core of Spark is RDD(Resilient Distributed Datasets) to make it fail proof, evenly distributed across cluster and abstract the complexity as a dataset. A RDD is created using a reference to a file on HDFS, a DB query result, json, csv whatever can return some data. You can then apply multiple java streaming operations on the RDD. The java streams concept itself originated from Scala / Spark to do the processing in a parallel mode for map type operations and utilize the cluster. One thing to remember similar to java the map operations are not kicked off till the reduce operation is called.

A Dataframe is extended from a RDD and contains Row objects which contain structured data. We can run SQL query over the Dataset. Spark SQL is quite mature and now is at a same level as doing map / reduce operations over the data set.

Spark Streaming

Spark Streaming brings the RDD and dataset concept to incoming streams with help of micro batching. It does not operate / pick up each event as it comes in, rather it waits for a time window to pass, collects all the data which came in that window and gives it to spark context as a RDD which is obtained from an abstraction layer of DStream (Discretized Streams). From there onwards, it is same processing as regular spark.

Three things about time intervals which are important in this approach

1.    A micro batch has a length which says the batch will contain data for this length of time

2.    There is a compute window interval which says once the batches are collected, I will do computation only after these many baches / time

3.    Analysis window length is the amount of data which the incoming micro batch can be compared against and is generally bigger than the micro batch window to avoid a refetch of the data which came in previously.

Here the micro batching is happening every 1 sec, the compute is happening every 2 seconds, giving it access to last 2 secs of data which it has not seen previously, and the analysis length is 3 seconds which gives it access to 1 more second of data from the last processing.

Apache Storm

Storm a real time processing engine for events on a cluster. It does not really belong in Hadoop ecosystem since it can be used to process any data stream, but since it has a distributed processing nature, and works with a inbuilt cluster manager, it may very well fit in into the big data paradigm. Storm has concept of topology which consist of Emitters called spouts which can be any data stream like Kafka etc,. The processors are called Bolts and can be chained together. A bolt can source its data from another bolt or multiple bolts. The scaling is achieved at a bolt level and multiple copies of a bolt can run in a topology for distribution of work load as some bolts may be doing less intensive tasks and other bolts may be heavy on resources. The data in the topology flows as Tuples like key value pairs.

Storm can also support micro-batching concept with help of a component Trident.

Storm has couple of data delivery guarantees which can be configured like atleast-once or exactly-once which ensures the data is processed as per needed use-case.

Storm topology consist of a Nimbus server to maintain topology which consists of supervisors. It uses Zookeeper to maintain the topology information and provides a UT to track the processing and cluster health.

Flink

It’s a competitor for Spark micro-batching and Storm. But is mostly does event by event processing. Its newer than Storm and has better scalability and windowing system. I am just adding here for completeness purposes, since I do not have hands-on on this one.

Data Ingestion solutions

Kafka

Kafka is a streaming technology which was developed in linkedin. It is highly scalable, partitionable, and topic based solution. It supports a publish-subscribe messaging stsrem. It stores the messages from publishers for some time and keeps the messages on a topic. The consumers can subscribe to a single or multiple topics. They can choose to retrieve data from the time when they got connecter or everything which is present on the topic. Every consumer may have their own position maintained in the system.

Kafka has a concept of consumer groups so a group of consumers can process messages without stepping on each others toes, while another group can also consume the same messages without interfering with other groups consumption patterns.

Flume

Flume is another way to get the streaming data into the cluster. It is made with Hadoop in mind as for Kafka which is more general purpose. It was made to get the logs from web servers into HDFS as Hadoop does not like multiple incoming connections which are writing to it at once.

Flume creates a buffer to help support incoming spikes.

A Flume architecture consist of Source, Channel and Sink. A source can be a directory where logs from web servers are getting dropped. A Channel can be memory based or file based which transfers the content from source to sink. A sink can be configured to write the data to hbase or hdfs.

It supports source channel selectors which and interceptors which can do some processing on data and route the data to desired channels.

It supports Sources as Spool directory, HTTP, Thrift, Kafka, Netcat etc.

For Sinks it provides support for HDFS, HBase, Hive, Thrift, ElasticSearch, Kafka and more.

Sqoop

It was a tool to import / export data between RDBMS and Hadoop. Its retired for good and is replaced by realtime / batch approaches.

 

 

Cheers – Amit Tomar