Saturday, November 26, 2016

DEMISTIFYING SPARK SERIALIZATION

To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable.
  • A class is never serialized only object of a class is serialized . Object serialization is needed if object needs to be persisted or transmitted over the network .    



Class Component   Serialization
   Instance variable   yes
  Static instance variable    no
  Methods    no
 Static methods    no
 Static inner class    no
 local variables    no





Lets take a sample Spark code and go through various scenarios

public class SparkSample {

      public int instanceVariable                =10 ;
      public static int staticInstanceVariable   =20 ;

      public int run(){
          
         int localVariable                       =30;

         // create Spark conf
         final SparkConf sparkConf = new SparkConf().setAppName(config.get(JOB_NAME).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

         // create spark context 
         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
        
        // read DATA 
  JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); 


        // Anonymous class used for lambda implementation
  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
    public Iterator<String> call(String s) {
                // How will the listed varibles be accessed in RDD across driver and Executors 
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
    return Arrays.asList(SPACE.split(s)).iterator();
     });

        // SAVE OUTPUT
        words.saveAsTextFile(OUTPUT_PATH));

      }

       // Inner Static class for the funactional interface which can replace the lambda implementation above 
       public static class MapClass extends FlatMapFunction<String, String>() {
        @Override
    public Iterator<String> call(String s) {
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
    return Arrays.asList(SPACE.split(s)).iterator();
     }); 

  public static void main(String[] args) throws Exception {
   JavaWordCount count = new JavaWordCount();
   count.run();
        }

}






Accessibility and Serializability of  instance variable from Outer Class  inside inner class objects 



Function               
Instance Variable (Outer class)  
Static Instance Variable (Outer class)
  Local Variable (Outer class)
Anonymous class
Accessible And Serialized
Accessible yet not Serialized
   Accessible And Serialized 
Inner Static class
Not Accessible 
Accessible yet not Serialized
   Not Accessible 



Rule of thumb while understanding Spark job is :
  
1. All the lambda functions written inside the RDD are instantiated on the driver and the objects are serialized and sent to the executors.

 2. If any outer class instance variables are accessed within the inner class,
 compiler apply different logic to access them , hence outer class gets serialized or not depends what do you access.

 3. In terms of Java , the whole debate is about Outer class vs Inner class and how does accessing outer class references and variables leads to serialization issues .



Various Scenarios:
                                                                                                             




Outer class variables accessed within  Anonymous inner class.



Instance Variable (Outer class)
 Static Instance Variable (Outer class)Local Variable (Outer class) 
 
Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object .
The outer class object is used to access the instance variable


Anonymous-class(){
     final Outer-class reference;
     Anonymous-class( Outer-class outer-reference){
    reference = outer-reference;
    }
}

The outer class is  serialized and sent along with the
serialized object of the inner anonymous class
 As static variables are not serialized , outer class
object is still inserted into the Anonymous class constructor .
The value of the static variable is taken from the class state
present on that executor .
 Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object  AND local variable reference
The outer class object is used to access the instance variable


Anonymous-class(){
     final Outer-class reference;
    final Local-variable localRefrence ;
     Anonymous-class( Outer-class outer-reference, Local-variable localRefrence){
    reference = outer-reference;
    this.localRefrence = localRefrence;
    }
}
The outer class is  serialized , and the local variable object is also
serialized and sent along with the serialized object of the inner anonymous class




                                                                                                
 Outer class variables accessed with inner Static inner  class.



Instance Variable (Outer class)
 Static Instance Variable   (Outer class)Local Variable (Outer class) 
 
cant be accessed

 As static variables are not serialized hence no outer class object is serialized.

The value of the static variable is taken from the class state
present on that executor .
Outer class is not serialized and send along with the serialized Static inner class
cant be accessed




Points to Think Through .
  1. Java Serialization rules are followed to select which class object
    needs to be serialized .
  2. Use javap -p -c "abc.class" to unwrap the byte code and see the compiler generated code
  3. Depending on what you are trying to access within the inner class of the outer class, compiler generates different byte code.
  4. You don't need to make classes implement Serialization which are only accessed on driver .
  5. Any Anonymous/Static class(all lambada function are anonymous class) used within RDD will be instantiated on the driver .
  6. Any class/variable used inside RDD will be instantiated on driver and sent to the executors .
  7. Any instance variable declared transient will not be serialized on driver.
  8. By default Anonymous classes will force you to make the outer class serializable.
  9. Any local variable/object need not have to be serializable .
  10. Only if local variable is used inside the Anonymous class needs to be serialized
  11. One can create singleton inside the call() method of pair,mapToPair function , thus making sure its never initialized on driver
  12. static variables are never serialized hence are never sent from driver to executors.
    1. if u need any service to be executed only on the executor , make them static fields inside the lambda function , or make them transient and singelton and check for null condition to instantiate them
    2. 14. when to use spark broadcast :  http://g-chi.github.io/2015/10/21/Spark-why-use-broadcast-variables/
       

Monday, December 28, 2015

JVM dissection to Tune GC (JDK 7 )


JVM dissection to Tune GC (JDK 7 )


Java : http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html ,
 wget -c -O jdk-7u80-linux-x64.tar.gz –no-check-certificate –no-cookies –header “Cookie: oraclelicense=accept-securebackup-cookie” “http://download.oracle.com/otn-pub/java/jdk/7u80-b15/jdk-7u80-linux-x64.tar.gz“   


How much memory  is at disposable of a process : A process can has RAM + Virtual memory at its disposal . Be it a C , perl , python , Java the whole of memory is available for it .


Whats so special About Java Heap : Unlike C process one one does not have to worry about memory deallocation (memory leaks issue) as Java manages the object deallocation on heap . This dosen’t restrict one to use memory available outside the Heap in Java  . One can use the Direct Memory allocation available in NIO package to allocate memory off the Heap .

Java Virtual Machine

The Java Virtual Machine (JVM) is an abstract computing machine. The JVM is a program that looks like a machine to the programs written to execute in it. This way, Java programs are written to the same set of interfaces and libraries. Each JVM implementation for a specific operating system, translates the Java programming instructions into instructions and commands that run on the local operating system. This way, Java programs achieve platform independence.
The first prototype implementation of the Java virtual machine, done at Sun Microsystems, Inc., emulated the Java virtual machine instruction set in software hosted by a handheld device that resembled a contemporary Personal Digital Assistant (PDA). Oracle’s current implementations emulate the Java virtual machine on mobile, desktop and server devices, but the Java virtual machine does not assume any particular implementation technology, host hardware, or host operating system. It is not inherently interpreted, but can just as well be implemented by compiling its instruction set to that of a silicon CPU. It may also be implemented in microcode or directly in silicon.
The Java virtual machine knows nothing of the Java programming language, only of a particular binary format, the class file format. A class file contains Java virtual machine instructions (or bytecodes) and a symbol table, as well as other ancillary information.
For the sake of security, the Java virtual machine imposes strong syntactic and structural constraints on the code in a class file. However, any language with functionality that can be expressed in terms of a valid class file can be hosted by the Java virtual machine. Attracted by a generally available, machine-independent platform, implementors of other languages can turn to the Java virtual machine as a delivery vehicle for their languages. (1) The Java Virtual Machine
Understanding JVM Memory Model is very important if you want to understand the working of Java Garbage Collection. We will look into different parts of JVM memory and how to monitor and perform garbage collection tuning.

Java (JVM) Memory Model

Java-Memory-Model
As you can see in the above image, JVM memory is divided into separate parts. At broad level, JVM Heap memory is physically divided into two parts – Young Generation and Old Generation.
Before one take a deep dive about Gc , Please run jconsole/jvisualm(part of Jdk) to see  realtime Gc in action :
Start from what you see in a JDK tool like jstat or jvisualvm and its visualgc plugin:
2. The JVM heap structure including the sub-segments of the New generation (right column).
The Java heap is made up of the Perm, Old and New (sometimes called Young) generations. The New generation is further made up of Eden space where objects are created and Survivor spaces S0 and S1 where they are kept later for a limited number of New generation garbage collection cycles. If you want more details, you might want to read Sun/Oracle’s whitepaper “Memory Management in the Java HotSpot Virtual Machine”.


Young Generation

Young generation is the place where all the new objects are created. When young generation is filled, garbage collection is performed. This garbage collection is called Minor GC. Young Generation is divided into three parts – Eden Memory and two Survivor Memory spaces.
Important Points about Young Generation Spaces:
  • Most of the newly created objects are located in the Eden memory space.
  • When Eden space is filled with objects, Minor GC is performed and all the survivor objects are moved to one of the survivor spaces.
  • Minor GC also checks the survivor objects and move them to the other survivor space. So at a time, one of the survivor space is always empty.
  • Objects that are survived after many cycles of GC, are moved to the Old generation memory space. Usually it’s done by setting a threshold for the age of the young generation objects before they become eligible to promote to Old generation.

Old Generation

Old Generation memory contains the objects that are long lived and survived after many rounds of Minor GC. Usually garbage collection is performed in Old Generation memory when it’s full. Old Generation Garbage Collection is called Major GC and usually takes longer time.

Permanent Generation

Permanent Generation or “Perm Gen” contains the application metadata required by the JVM to describe the classes and methods used in the application. Note that Perm Gen is not part of Java Heap memory.
Perm Gen is populated by JVM at runtime based on the classes used by the application. Perm Gen also contains Java SE library classes and methods. Perm Gen objects are garbage collected in a full garbage collection.
Types of GC (Logical ) : http://www.javacodegeeks.com/2012/01/practical-garbage-collection-part-1.html
Basic stop-the-world, mark, sweep, resume
Compacting vs. non-compacting garbage collection
Generational garbage collection
Parallel collection
Incremental collection
Concurrent collection

Stop the World Event

All the Garbage Collections are “Stop the World” events because all application threads are stopped until the operation completes.
Since Young generation keeps short-lived objects, Minor GC is very fast and the application doesn’t get affected by this.
However Major GC takes longer time because it checks all the live objects. Major GC should be minimized because it will make your application unresponsive for the garbage collection duration. So if you have a responsive application and there are a lot of Major Garbage Collection happening, you will notice timeout errors.
The duration taken by garbage collector depends on the strategy used for garbage collection. That’s why it’s necessary to monitor and tune the garbage collector to avoid timeouts in the highly responsive applications.

Java Garbage Collection Types

According to JDK 7, there are 5 GC types. 
  1. Serial GC
  2. Parallel GC
  3. Parallel Old GC (Parallel Compacting GC)
  4. Concurrent Mark & Sweep GC  (or “CMS”)
  5. Garbage First (G1) GC
Among these, the serial GC must not be used on an operating server. This GC type was created when there was only one CPU core on desktop computers. Using this serial GC will drop the application performance significantly.
  1. Serial GC (-XX:+UseSerialGC): Serial GC uses the simple mark-sweep-compactapproach for young and old generations garbage collection i.e Minor and Major GC.Serial GC is useful in client-machines such as our simple stand alone applications and machines with smaller CPU. It is good for small applications with low memory footprint.
  2. Parallel GC (-XX:+UseParallelGC): Parallel GC is same as Serial GC except that is spawns N threads for young generation garbage collection where N is the number of CPU cores in the system. We can control the number of threads using -XX:ParallelGCThreads=nJVM option.Parallel Garbage Collector is also called throughput collector because it uses multiple CPUs to speed up the GC performance. Parallel GC uses single thread for Old Generation garbage collection.
  3. Parallel Old GC (-XX:+UseParallelOldGC): This is same as Parallel GC except that it uses multiple threads for both Young Generation and Old Generation garbage collection.
  4. Concurrent Mark Sweep (CMS) Collector (-XX:+UseConcMarkSweepGC only for old ): CMS Collector is also referred as concurrent low pause collector. It does the garbage collection for Old generation. CMS collector tries to minimize the pauses due to garbage collection by doing most of the garbage collection work concurrently with the application threads.CMS collector on young generation uses the same algorithm as that of the parallel collector. This garbage collector is suitable for responsive applications where we can’t afford longer pause times. We can limit the number of threads in CMS collector using -XX:ParallelCMSThreads=n JVM option.
  5. G1 Garbage Collector (-XX:+UseG1GC): The Garbage First or G1 garbage collector is available from Java 7 and it’s long term goal is to replace the CMS collector. The G1 collector is a parallel, concurrent, and incrementally compacting low-pause garbage collector.Garbage First Collector doesn’t work like other collectors and there is no concept of Young and Old generation space. It divides the heap space into multiple equal-sized heap regions. When a garbage collection is invoked, it first collects the region with lesser live data, hence “Garbage First”. You can find more details about it at Garbage-First Collector Oracle Documentation.
Minor/major/full Gc
Minor GC
Collecting garbage from Young space (consisting of Eden and Survivor spaces) is called a Minor GC. This definition is both clear and uniformly understood. But there are still some interesting take-aways you should be aware of when dealing with Minor Garbage Collection events:
  1. Minor GC is always triggered when JVM is unable to allocate space for a new Object, e.g. the Eden is getting full. So the higher the allocation rate, the more frequently Minor GC is executed.
  2. Whenever the pool is filled, its entire content is copied and the pointer can start tracking the free memory from zero again. So instead of classical Mark, Sweep and Compact, cleaning Eden and Survivor spaces is carried out with Mark and Copy instead. So, no fragmentation actually takes place inside Eden or Survivor spaces. The write pointer is always residing on the top of the used pool.
  3. During a Minor GC event, Tenured generation is effectively ignored. References from tenured generation to young generation are considered de facto GC roots. References from young generation to Tenured generation are simply ignored during the markup phase.
  4. Against common belief, all Minor GCs do trigger stop-the-world pauses, stopping the application threads. For most of the applications, the length of the pauses is negligible latency-wise. This is true if most of the objects in Eden can be considered garbage and are never copied to Survivor/Old spaces. If the opposite is true and most of the newborn objects are not eligible for GC, Minor GC pauses start taking considerably more time.
So with Minor GC the situation was rather clear – every Minor GC cleans the Young generation.

Major GC vs Full GC

One should notice that there is no formal definitions present for those terms. Neither in JVM specification nor in the Garbage Collection research papers. But on the first glance, building these definitions on top of what we know to be true about Minor GC cleaning Young space should be simple:
  • Major GC is cleaning the Tenured space.
  • Full GC is cleaning the entire Heap – both Young and Tenured spaces.
Unfortunately it is a bit more complex and confusing. To start with – many Major GCs are triggered by Minor GCs, so separating the two is impossible in many cases. On the other hand – many modern garbage collections perform cleaning the Tenured space partially, so again, using the term “cleaning” is only partially correct.
This leads us to the point where instead of worrying whether the GC is called Major or Full GC, you should focus to finding out whether the GC at hand stopped all the application threads or was it able to progress concurrently with the application threads.
Gc algo in details : http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html
JVM params to control GC : http://blog.ragozin.info/2011/09/hotspot-jvm-garbage-collection-options.html
Gc analysis tools : http://www.cubrid.org/blog/dev-platform/how-to-monitor-java-garbage-collection/
Commands to print Gc dump :
  • -XX:+PrintGC
  • -XX:+PrintGCDetails
  • -XX:+PrintGCDateStamps
  • -XX:+PrintGCApplicationStoppedTime
  • -XX:+PrintPromotionFailure
Tips to remember :
  •  Any object created will be created in Young Gen
  •  Before object is transfered to Tenure Gen it is just kept moving  between so and s1 
  •  Gc params are there to  move object directlt to Tenure gen (both num cycles and sizes are configurable )
  • All minor GC algo are stop the world , only in case of Major Gc “stop the world can be avoided”
  • jconsole/jvisualm are part of jdk so please use it to see realtime working of GC 
  • jstat , jmap , Gc logging can be used to see the GC nos
  • Gc pause of ms is ok , if one is seeing pause for seconds(2+ ) its time to tune your GC . As gc directly impacts application responsiveness
  • Gc is a problem of reference counting and not how much memory(big chunks ) has been allocated ( Allocating larger chunk does not ensure less GC pressure on your application )

PERSPECTIVE ABOUT A SINGLE NODE

Hadoop clusters  provides two main functionality
* HDFS  : hadoop distributed FileSystem
* Data Processing : using Distributed way of processing data .

A cluster is build of multiple machines runing together in  tandem to complete a specific task .   Every node is executing the same piece of “Framework code “. If one is able to appreciate the working of a single node the cocenpts and abstraction build by distributed systems becomes very easy to understand .
A single node provides 4 resource  to use
DISK
RAM
CPU
NETWORK

DISK :  A given file on machine is saved on a persistent storage namely HDD(Hard disk Drive).  The file which looks sequential in nature to a user(open a file saved on hardisk using gedit )  in reality can be sparse across multiple platters and sectors on HDD (http://www.cyberciti.biz/tips/understanding-unixlinux-filesystem-superblock.html). One can corelate this with a linked List way of storing the data . The Link info (metadata) maintained by the FileSystem and actual data is stored in Physical blocks.

Various linux command can be used to figure out where a file has been physically placed on a disk :

sudo hdparm –fibmap fileName
filefrag -b512 -v fileName

The fileSystem is the one which keep a logical binding of the sparsely stored   data .  In Unix ext2/ext3 are different fileSystems to provide for abstraction of the underlying storage .  The Filesystem thus needs to maintain a metadata where it can save the logical linking of the  file blocks.
The metadata has info about fileSystem logical Id : Inode number to HDD physical Storage ID :

In Unix the metadata of the fileSystem can be seen using

df -hk : gives one the list of hard disk on that machine
dumpe2fs /dev/HHD_name  | grep -i superblock
Lets analyse the same concept in terms of a Distributed Hadoop File System .

* Instead of Single machine the File System spans across multiple machines.
* The metada is so huge that it required a Seperate machine to hold the meta data
* The machine holding the metadata is known as NameNode .
* all other nodes have a deamon process known as DataNode running on those machines . Namenode and datanodes work in Master-Slave relatioship with NameNode being the master.

Processing
The processing power of any machine is determined by the number of Cpu that machine has and amount of RAM at disposal to CPU .
various commands which can help figure out about these resources are

top
free -h
iostatc -x -c 3
mpStat -p ALL

In a single Node the data is present in the HDD for processing , a program needs to figure out where the data is present/saved by interactiing with the fileSystem and and processing/transforming the data using the CPU .
In Distributed Cluster (hadoop) the data is  spanned across multiple machines and so is the CPU and RAMS.

A efficient way of processing the data is to move the processing  where the data is located (This reduces network transfer ) and  with this strategy one can  use more number of CPU , RAM , HDD bandwidth to process the data (rather than fetching the data to a single machine and processing there).
Hadoop provides  JobTracker and TaskTracher as the deamons which can help cordinate the distributed processing .

JobTracker and TaskTracker works in Master-Slave relationship .   Given a job Namenode is clonsulted for nodes on which data is available and jobtracker spawns processing (on a best effort basis ) to process the data partially .   The task of monitoring the individual job on individual nodes is taken care by task tracker.

MR debuging by taking JVM heap dumps

MR debuging by taking JVM heap dumps

Taking Heap Dump manually:
jmap -histo:live  pid  (Histogram)
jmap -dump:live,format=b,file=file-name.bin <pid> (dump jvm heap as a file on disk)
  1. Logonto the datanode where the map/reduce jvm is running ,  run ps -eaf | grep attempt_id  to get the pid .
  2. Use Sudo -u “appropriate user to get the heap dump by using jmap command”.
  3. Never use -f option . while taking the dump using jmap .


To analyse the dump , use jhat .
jhat  -port “protno”  heap_file_path .
What to look for in the Jhat analysis
  1. Object address having highest memory footprints
  2. objects having highest instance count .
Taking HeapDump on OutOfMemoryException using Jvm -XX options
set the following option in Job configuration .
set mapred.child.java.opts  ‘-Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/@taskid@S2sSdebug.hprof ‘.
This option launches the map/reduce task jvm with the value specified thus giving us  handle to control various jvm memory related parameters.
Few things to note
  1. -Xmx512m                                                                                        heap memory in MB
  2. -XX:+HeapDumpOnOutOfMemoryError                                           dump heap on disk when jvm goes  out of memory
  3. -XX:HeapDumpPath=/tmp/@taskid@S2sSdebug.hprof                   @taskid@   is replaced by hadoop framework with original taskid which is unique .
One needs to log on to the data nodes and heap dump file would be present at  /tmp   , file would be named as @taskid@S2sSdebug.hprof  ( @taskid@   is replaced by hadoop framework with the original taskid). Jhat can be used to analyze the dump .
Taking HeapDump on OutOfMemoryException And Collecting the  dump files across datanodes  in a  hdfs location for further analysis .
The above mentioned option required one to log on in the  datanode on which the map/reduce task has been spawned , and run jmap , jhat on those machines . A MR task which has 100 of Map/reduce tasks can make this process very difficult . This option mentioned below provides a mechanism to collect all heap dump in a specified hdfs location .
Make a shell script named dump.sh :
#!/bin/sh
text=`echo $PWD | sed ‘s=/=\\_=g’`        (this helps in figuring out which heap dump belongs to which task)
hadoop fs -put heapdump.hprof    /user/kunalg/hprof/$text.hprof
  1.  Place the dump.sh  script in a hdfs location by using hadoop dfs -put dump.sh   “hdfs location (example /user/kunalg/dump.sh) “
  2.  Create a dir on hdfs where u want to gather all the heap dumps and give 777 permission to that dir . (example hadoop dfs -chmod -R 777 /user/kunalg/hprof)
  3. Set the following proprties in the MR job
  • set mapred.child.java.opts ‘-Xmx256m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./heapdump.hprof  -XX:OnOutOfMemoryError=./dump.sh’
  • set mapred.create.symlink ‘yes’
  • set mapred.cache.files ‘hdfs:///user/kunalg/dump.sh#dump.sh
Run the MR job , any OOME issue in any of the datanode will take a heapdump and place the dump file into the specified hdfs location .
One can verify sane execution of the script  in the stdoutLog .
on Stdlogout :
java.lang.OutOfMemoryError: Java heap space
Dumping heap to ./heapdump.hprof ...
Heap dump file created [12039655 bytes in 0.081 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="./dump.sh"
#   Executing /bin/sh -c "./dump.sh"...
Use Hadoop Default profiler for profiling and finding issues

 set mapred.task.profile '  true'; set mapred.task.profile.params  '-agentlib:hprof=cpu=samples,heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s'   set mapred.task.profile.maps   '0-1'    set mapred.task.profile.reduces   '0-1' 
profiler will  provide the details  of the jvm  tasks in the specified range . Location of the dump will be  availabe at TaskLogs  under profile.out logs section .

Kenning Kafka

What is Kafka ?
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design .
Those coming from NOSQL world(Hbase,cassandra , Aerospike) , Kafka  architecture  is similar to  Architecture of WAL   subtracting  LSM feature in each of the framework
Basics Kafka Lingo
  1. A stream of messages of a particular type is defined as a topic.
  2. A producer can publish messages to a topic.
  3. The published messages are then stored at a set of servers called brokers.
  4. A consumer can subscribe to one or more topics and consume the published messages by pulling data from the brokers.
So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
Communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. We provide a Java client for Kafka, but clients are available in many languages.
The overall architecture of Kafka is shown in Figure 1. Since Kafka is distributed in nature, a Kafka cluster typically consists of multiple brokers. To balance load, a topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time.
Kafka Architecture
Figure 1- Kafka Architecture
Kafka Storage – Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of equal sizes. Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. Segment file is flushed to disk after configurable number of messages has been published or after certain amount of time. Messages are exposed to consumer after it gets flushed.
Unlike traditional message system, a message stored in Kafka system doesn’t have explicit message ids. Messages are exposed by the logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Messages ids are increasing but not consecutive. To compute the id of next message adds a length of the current message to its logical offset.
Consumer always consumes messages from a particular partition sequentially and if the consumer acknowledge particular message offset, it implies that the consumer has consumed all prior messages. Consumer issues asynchronous pull request to the broker to have a buffer of bytes ready to consume. Each asynchronous pull request contains the offset of the message to consume. Kafka exploits the sendfile API to efficiently deliver bytes in a log segment file from a broker to a consumer.
Kafka Storage Image
Figure 2- Kafka Storage Architecture
Kafka Broker: Unlike other message system, Kafka broker are stateless. By stateless, means consumer has to maintain how much he has consumed. Consumer maintains it by itself and broker would not do anything. Such design is very tricky and innovative in itself –
  1. It is very tricky to delete message from the broker as broker doesn’t whether consumer consumed the message or not. Kafka solves this problem by using a simple time-based SLA for the retention policy. A message is automatically deleted if it has been retained in the broker longer than a certain period.
  2. This design has a benefit too, as consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but proves to be an essential feature for many consumers


Various Architectural choices made  Keeping OS into consideration to achieve High throughput Low latency:

  • dependency on Page Cache:
    To compensate for the  slow Hard Disk Drive throughput   modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.Furthermore we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things:
  1. The memory overhead of objects is very high, often doubling the size of the data stored (or worse).
  2. Java garbage collection becomes increasingly fiddly and slow as the in-heap data increases. As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance)
    .
  • Avoid redundant Data copy between  kernel and user space
    Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: 1.The operating system reads data from the disk into pagecache in kernel space.
    2.The application reads the data from kernel space into a user-space buffer
    3. The application writes the data back into kernel space into a socket buffer
    4.The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network.
    This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.Kafka expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.
  • Distribution of Data across Disks
    If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
  • Batching of messages on Producer/consumer to optimize network latency and throughput
    The small I/O problem happens both between the client and the server and in the server’s own persistent operations.To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.
  • End-to-end Batch(data) Compression. The Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.Kafka supports GZIP and Snappy compression protocols.
  • Producer Push And Consumer pull resulting in loosely coupled framework
    The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The consumer specifies its position in the log with each request and receives back a chunk  of log beginning at that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
  • Consumer Position (in Pull based world)
    Kafka topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap. There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
Ways to increase the Throughput
  • batch.size – This is an upper limit of how many messages Kafka Producer will attempt to batch before sending – specified in bytes (Default is 16K bytes – so 16 messages if each message is 1K in size). Kafka may send batches before this limit is reached (so latency doesn’t change by modifying this parameter), but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.
  • linger.ms – How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch. Normally the producer will not wait at all, and simply send all the messages that accumulated while the previous send was in progress (2 ms in the example above), but as we’ve discussed, sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency. In this case tuning linger.ms to a higher value will make sense. Note that if batch.size is low and the batch if full before linger.ms time passes, the batch will send early, so it makes sense to tune batch.size and linger.ms together.
  • Increase RAM , number of Disk, Network Bandwidth as much as possible.
  • It is not necessary to tune these settings, however those wanting to optimize performance have a few knobs that will help:
    • data=writeback: Ext4 defaults to data=ordered which puts a strong order on some writes. Kafka does not require this ordering as it does very paranoid data recovery on all unflushed log. This setting removes the ordering constraint and seems to significantly reduce latency.
    • Disabling journaling: Journaling is a tradeoff: it makes reboots faster after server crashes but it introduces a great deal of additional locking which adds variance to write performance. Those who don’t care about reboot time and want to reduce a major source of write latency spikes can turn off journaling entirely.
    • commit=num_secs: This tunes the frequency with which ext4 commits to its metadata journal. Setting this to a lower value reduces the loss of unflushed data during a crash. Setting this to a higher value will improve throughput.
    • nobh: This setting controls additional ordering guarantees when using data=writeback mode. This should be safe with Kafka as we do not depend on write ordering and improves throughput and latency.
    • delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance.

Hbase Musing


Various Mappings : 
  • Zookeeper quroum has odd no of machines .
  • There are more than one master out of which only one is active
  • 1 Region Server(regionserver) can host multiple Regions(hregions)  1:m mapping
  •  1 Region(hregions) => multiple  column family  (Store)
  • 1 store => 1 memstore , multiple Hfiles
  • 1 Regions => multiple memstore , multiple hfile .
  • 1 RegionServer => 1 Wal , 1 RegionServer => 1 BlockCache
All Parameters for Hbase are intertwined . Changing one needs careful tuning of the other .
The Params in Hbase are mainly divide under the following categories :
* client
* regionServer
* hregion
* Hlog
Bottlenecks for Hbase is the associated storage HDFS as its a Disk read/write .
Why params need to be tuned
* to avoid full GC on RegionServer which can lead to complete halt
* minor/major compaction (resulted from memstore flush ).
* frequent allocation of memstore and Block Cache may result in JVM heap fragmentation .

The topic of flushes and compactions comes up frequently when using HBase. There are somewhat obscure configuration options around this, you hear terms like “write amplification”, and you might see scary messages about blocked writes in the logs until a flush has finished. Let’s step back for a minute and explore what HBase is actually doing. The configuration parameters will then make more sense.
Unlike most traditional databases HBase stores its data in “Log Structured Merge” (LSM) Trees.
There are numerous academic treatments concerning LSM trees, so I won’t go into details here.
Basically in HBase it works something like this:
  1. Edits (Puts, etc) are collected and sorted in memory (using a skip list specifically). HBase calls this the “memstore
  2. When the memstore reached a certain size (hbase.hregion.memstore.flush.size) it is written (or flushed) to disk as a new “HFile
  3. There is one memstore per region and column family
  4. Upon read, HBase performs a merge sort between all – partially sorted – memstore disk images (i.e. the HFiles)
From a correctness perspective that is all that is needed… But note that HBase would need to consider every memstore image ever written for sorting. Obviously that won’t work. Each file needs to be seeked and read in order to find the next key in the sort. Hence eventually some of the HFiles need to be cleaned up and/or combined: compactions.
compaction asynchronously reads two or more existing HFiles and rewrites the data into asingle new HFile. The source HFiles are then deleted.
This reduces the work to be done at read time at the expense of rewriting the same data multiple times – this effect is called “write amplification“. (There are some more nuances like major and minor compaction, which files to collect, etc, but that is besides the point for this discussion)
This can be tweaked to optimize either reads or writes.
If you let HBase accumulate many HFiles without compacting them, you’ll achieve better write performance (the data is rewritten less frequently). If on the other hand you instruct HBase to compact many HFiles sooner you’ll have better read performance, but now the same data is read and rewritten more often.
HBase allows to tweak when to start compacting HFiles and what is considered the maximum limit of HFiles to ensure acceptable read performance.
Generally flushes and compaction can commence in parallel. A scenario of particular interest is when clients write to HBase faster than the IO (disk and network) can absorb, i.e. faster than compactions can reduce the number of HFiles – manifested in an ever larger number of HFiles, eventually reaching the specified limit.
When this happens the memstores can continue to buffer the incoming data, but they cannot grow indefinitely – RAM is limited.
What should HBase do in this case? What can it do?
The only option is to disallow writes, and that is exactly what HBase does.
There are various knobs to tweak flushes and compactions:
  • hbase.hregion.memstore.flush.size
    The size a single memstore is allowed to reach before it is flushed to disk.
  • hbase.hregion.memstore.block.multiplier
    A memstore is temporarily allowed to grow to the maximum size times this factor.
  • hbase.regionserver.global.memstore.lowerLimit
    JVM global limit on aggregate memstore size before some of the memstore are force-flushed (in % of the heap).
  • hbase.regionserver.global.memstore.upperLimit
    JVM memstore size limit before writes are blocked (in % of the heap)
  • hbase.hstore.compactionThreshold
    When a store (region and column family) has reach this many HFiles, HBase will start compacting HFiles
  • hbase.hstore.blockingStoreFiles
    HBase disallows further flushes until compactions have reduced the number of HFiles at least to this value. That means that now the memstores need to buffer all writes and hence eventually are subject blocking clients if compactions cannot keep up.
  • hbase.hstore.compaction.max
    The maximum number of HFiles a single – minor – compaction will consider.
  • hbase.hregion.majorcompactionTime interval between timed – major – compactions. HBase will trigger a compaction with this frequency even when no changes occurred.
  • hbase.hstore.blockingWaitTime
    Maximum time clients are blocked. After this time writes will be allowed again.
So when hbase.hstore.blockingStoreFiles HFiles are reached and the memstores are full (reaching
hbase.hregion.memstore.flush.size *
hbase.hregion.memstore.block.multiplier or due their aggregate size reaching hbase.regionserver.global.memstore.upperLimit) writes are blocked for hbase.hstore.blockingWaitTime milliseconds. Note that this is not a flaw of HBase but simply physics. When disks/network are too slow at some point clients needs to slowed down.
keep hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs just a bit above hbase.regionserver.global.memstore.lowerLimit * HBASE_HEAPSIZE.
MATHS :
http://hadoop-hbase.blogspot.in/2013/01/hbase-region-server-memory-sizing.html
Horton works hbase :
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.5/bk_system-admin-guide/content/ch_hbase_cluster_config.html
http://hbase.apache.org/book.html#ops.stripe
Experimental: Stripe Compactions

Playing With Maven

Maven is a powerful build tool for Java software projects. Actually, you can build software projects using other languages too, but Maven is developed in Java, and is thus historically used more for Java projects. Once you understand the core concepts, it is much easier to lookup the fine detail in the Maven documentation, or search for it on the internet.

Maven Overview – Core Concepts

Maven is centered around the concept of POM files (Project Object Model). A POM file is an XML representation of project resources like source code, test code, dependencies (external JARs used) etc. The POM contains references to all of these resources. The POM file should be located in the root directory of the project it belongs to.
Here is a diagram illustrating how Maven uses the POM file, and what the POM file primarily contains:
Overview of Maven core concepts.
Overview of Maven core concepts.
POM Files
When you execute a Maven command you give Maven a POM file to execute the commands on. Maven will then execute the command on the resources described in the POM.
Build Life Cycles, Phases and Goals
The build process in Maven is split up into build life cycles, phases and goals. A build life cycle consists of a sequence of build phases, and each build phase consists of a sequence of goals. When you run Maven you pass a command to Maven. This command is the name of a build life cycle, phase or goal. If a life cycle is requested executed, all build phases in that life cycle are executed. If a build phase is requested executed, all build phases before it in the pre-defined sequence of build phases are executed too.

Super POM

All Maven POM files inherit from a super POM. If no super POM is specified, the POM file inherits from the base POM. Here is a diagram illustrating that:
Super POM and POM inheritance.
Super POM and POM inheritance.

Maven Repositories

Maven repositories are directories of packaged JAR files with extra meta data. The meta data are POM files describing the projects each packaged JAR file belongs to, including what external dependencies each packaged JAR has. It is this meta data that enables Maven to download dependencies of your dependencies recursively, until the whole tree of dependencies is download and put into your local repository.
Maven repositories are covered in more detail in the Maven Introduction to Repositories, but here is a quick overview.
Maven has three types of repository:
  • Local repository
  • Central repository
  • Remote repository
Maven searches these repositories for dependencies in the above sequence. First in the local repository, then in the central repository, and third in remote repositories if specified in the POM.
Here is a diagram illustrating the three repository types and their location:
Maven Repository Types and Location.
Build Life Cycles
Maven has 3 built-in build life cycles. These are:
  1. default
  2. clean
  3. site
Each of these build life cycles takes care of a different aspect of building a software project. Thus, each of these build life cycles are executed independently of each other. You can get Maven to execute more than one build life cycle, but they will be executed in sequence, separately from each other, as if you had executed two separate Maven commands.
The default life cycle handles everything related to compiling and packaging your project. The clean life cycle handles everything related to removing temporary files from the output directory, including generated source files, compiled classes, previous JAR files etc. The site life cycle handles everything related to generating documentation for your project. In fact, site can generate a complete website with documentation for your project.
Build Phases
Each build life cycle is divided into a sequence of build phases, and the build phases are again subdivided into goals. Thus, the total build process is a sequence of build life cycle(s), build phases and goals.
You can execute either a whole build life cycle like clean or site, a build phase like install which is part of the default build life cycle, or a build goal like dependency:copy-dependencies. Note: You cannot execute the default life cycle directly. You have to specify a build phase or goal inside the default life cycle.
When you execute a build phase, all build phases before that build phase in this standard phase sequence are executed. Thus, executing the install build phase really means executing all build phases before the install phase, and then execute the install phase after that.
Build Phase Description
validate Validates that the project is correct and all necessary information is available. This also makes sure the dependencies are downloaded.
compile Compiles the source code of the project.
test Runs the tests against the compiled source code using a suitable unit testing framework. These tests should not require the code be packaged or deployed.
package Packs the compiled code in its distributable format, such as a JAR.
install Install the package into the local repository, for use as a dependency in other projects locally.
deploy Copies the final package to the remote repository for sharing with other developers and projects.
Mvn Tricks and Tips :
mvn useful options
-h, –help Display help information
-o, –offline Work offline
-f, –fileForces the use of an alternate POM file
-s,–settingsAlternate path for the user settings file
-e, –errors Produce execution error messages
-X, –debug Produce execution debug output
-q, –quiet Quiet output – only show errors
Related to Plugins :
-npu, –no-plugin-updates Suppress upToDate check for any relevant registered plugins. Providing this option will have the affect of stabilizing Maven on all of the plugins versions that are currently available in a local Maven repository. With -npu active, Maven will not consult the remote repository for newer Maven versions.
-cpu, –check-plugin-updates Force upToDate check for any relevant registered plugins. Forces Maven to check for the latest released version of a Maven plugin. Not that this will not affect your build if you are explicitly specifying versions for Maven plugins in your project’s POM.
-up, –update-plugins Synonym for cpu.
Mvn dependency resolution :
  1. Max priority is given to groupid,Artifcat provided in pom
  2. In case multiple  jars of same groupid,artifact asre present jar with latest version is choosen
All About Classpath :
mvn dependency:copy-dependencies -DincludeScope=”test”     “any goal compile test runtime can be provided”
mvn -e -x “compile/test”
mvn dependency:build-classpath
mvn -e -X  “compile/test”
mvn install -p module1 , module2  build specific modules
mvn help:effective-pom       build the effective pom .
mvn dependency:analyze  shows dependencies which are declared and not used , used and not declared
mvn dependency:tree  shows complete list of dependency being used for  building
mvn version:display-dependency-updates figure  out which dependency have newer version releases
All About Scope :
http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope