DBInputFormat för att överföra data från SQL till NoSQL-databas



Syftet med denna blogg är att lära sig att överföra data från SQL-databaser till HDFS, hur man överför data från SQL-databaser till NoSQL-databaser.

I den här bloggen kommer vi att undersöka funktionerna och möjligheterna för en av de viktigaste komponenterna i Hadoop-teknologin, dvs. MapReduce.

Idag antar företag Hadoop-ramverk som sitt första val för datalagring på grund av dess förmåga att hantera stora data effektivt. Men vi vet också att data är mångsidiga och finns i olika strukturer och format. För att kontrollera ett så stort antal data och dess olika format bör det finnas en mekanism för att tillgodose alla sorter och ändå ge ett effektivt och konsekvent resultat.





Den mest kraftfulla komponenten i Hadoop-ramverket är MapReduce som kan ge kontrollen över data och dess struktur bättre än dess andra motsvarigheter. Även om det kräver overhead av inlärningskurvan och programmeringskomplexiteten, om du kan hantera dessa komplexiteter kan du säkert hantera alla typer av data med Hadoop.

MapReduce-ramverket delar upp alla bearbetningsuppgifter i princip i två faser: Map och Reduce.



Förberedelse av rådata för dessa faser kräver förståelse för vissa grundläggande klasser och gränssnitt. Superklassen för denna upparbetning är Inmatningsformat.

De Inmatningsformat class är en av kärnklasserna i Hadoop MapReduce API. Denna klass ansvarar för att definiera två huvudsakliga saker:

  • Datasplittringar
  • Spela in läsare

Datadelning är ett grundläggande koncept i Hadoop MapReduce-ramverk som definierar både storleken på enskilda kartuppgifter och dess potentiella körningsserver. De Spela in läsare ansvarar för faktiska läsning av poster från inmatningsfilen och skickar dem (som nyckel / värdepar) till mapparen.



Antalet kartläggare bestäms baserat på antalet delningar. Det är InputFormats uppgift att skapa delningarna. För det mesta är splitstorlek motsvarande blockstorlek men det är inte alltid som delningar skapas baserat på HDFS-blockstorlek. Det beror helt på hur getSplits () -metoden för din InputFormat har åsidosatts.

Det finns en grundläggande skillnad mellan MR-split och HDFS-block. Ett block är en fysisk bit av data medan en split bara är en logisk bit som en mapper läser. En split innehåller inte indata, den innehåller bara en referens eller adress för data. En split har i princip två saker: En längd i byte och en uppsättning lagringsplatser, som bara är strängar.

För att förstå detta bättre, låt oss ta ett exempel: Bearbeta data som lagras i din MySQL med MR. Eftersom det inte finns något koncept för block i detta fall, är teorin: 'splits skapas alltid baserat på HDFS-blocket',misslyckas. En möjlighet är att skapa splittringar baserat på rader i din MySQL-tabell (och detta är vad DBInputFormat gör, ett inmatningsformat för att läsa data från en relationsdatabas). Vi kan ha k antal delningar som består av n rader.

Det är bara för InputFormats baserat på FileInputFormat (en InputFormat för hantering av data som är lagrade i filer) som delningarna skapas baserat på den totala storleken, i byte, av inmatningsfilerna. FileSystemets blockstorlek för inmatningsfilerna behandlas dock som en övre gräns för ingångssplittringar. Om du har en fil som är mindre än HDFS-blockstorleken får du bara en mappare för den filen. Om du vill ha något annat beteende kan du använda mapred.min.split.size. Men det beror återigen enbart på getSplits () på din InputFormat.

Vi har så många redan existerande ingångsformat tillgängliga under paketet org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

vad används mongodb till

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

OgiltigtInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Standard är TextInputFormat.

På samma sätt har vi så många utdataformat som läser data från reducerare och lagrar dem i HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Standard är TextOutputFormat.

När du hade läst den här bloggen hade du lärt dig:

  • Hur man skriver ett kartreduceringsprogram
  • Om olika typer av InputFormats som finns i Mapreduce
  • Vad är behovet av InputFormats
  • Hur man skriver anpassade InputFormats
  • Hur man överför data från SQL-databaser till HDFS
  • Hur man överför data från SQL-databaser (här MySQL) till NoSQL-databaser (här Hbase)
  • Hur man överför data från en SQL-databas till en annan tabell i SQL-databaser (Det kanske inte är så viktigt om vi gör detta i samma SQL-databas. Det är dock inget fel i att ha kunskap om detsamma. Man vet aldrig hur det kan komma i bruk)

Nödvändig förutsättning:

  • Hadoop förinstallerat
  • SQL förinstallerat
  • Hbase förinstallerat
  • Java grundläggande förståelse
  • MapReduce kunskap
  • Hadoop ram grundläggande kunskap

Låt oss förstå problemförklaringen som vi ska lösa här:

Vi har en anställdstabell i MySQL DB i vår relationsdatabas Edureka. Nu enligt företagets krav måste vi flytta all tillgänglig data i relationell DB till Hadoop-filsystem, dvs HDFS, NoSQL DB, känd som Hbase.

Vi har många alternativ att göra denna uppgift:

  • Sqoop
  • Flume
  • MapReduce

Nu vill du inte installera och konfigurera något annat verktyg för den här åtgärden. Du har bara ett alternativ som är Hadoops bearbetningsram MapReduce. MapReduce-ramverk ger dig full kontroll över data medan du överför. Du kan manipulera kolumnerna och placera dem direkt på någon av de två målplatserna.

Notera:

  • Vi måste ladda ner och placera MySQL-kontakten i Hadoop-klassvägen för att hämta tabeller från MySQL-tabellen. För att göra detta ladda ner anslutningen com.mysql.jdbc_5.1.5.jar och behåll den under katalogen Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Nedladdningar / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / dela / hadoop / mapreduce / lib /
  • Lägg också alla Hbase-burkar under Hadoop-klassvägen för att göra ditt MR-program åtkomst till Hbase. För att göra detta, kör följande kommando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / dela / hadoop / mapreduce / lib /

Programvaruversionerna som jag har använt vid utförandet av denna uppgift är:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Eclipse Moon

För att undvika programmet i kompatibilitetsproblem föreskriver jag mina läsare att köra kommandot med liknande miljö.

Anpassad DBInputWritable:

paketet com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implement Writable, DBWritable {private int id private Strängnamn, avdelning public void readFields (DataInput in) kastar IOException {} public void readFields (ResultSet rs) kastar SQLException // Resultatuppsättning representerar data som returneras från en SQL-sats {id = rs.getInt (1) namn = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) kastar IOException { } public void write (PreparedStatement ps) kastar SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Anpassad DBOutput Skrivbar:

paketet com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implement Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = namnge this.id = id this.dept = dept} public void readFields (DataInput in) kastar IOException {} public void readFields (ResultSet rs) kastar SQLException {} public void write (DataOutput out) kastar IOException {} public void write (PreparedStatement ps) kastar SQLException {ps.setString (1, namn) ps.setInt (2, id) ps.setString (3, avdelning)}}

Ingångstabell:

skapa databas edureka
skapa tabell emp (empid int inte null, namn varchar (30), avdelning varchar (20), primär nyckel (empid))
infoga i emp-värden (1, 'abhay', 'utveckling'), (2, 'brundesh', 'test')
välj * från emp

Fall 1: Överföring från MySQL till HDFS

paketet com.inputFormat.copy importerar java.net.URI importerar org.apache.hadoop.conf.Configuration importerar org.apache.hadoop.fs.FileSystem importerar org.apache.hadoop.fs.Path importerar org.apache.hadoop.mapreduce .Job importerar org.apache.hadoop.mapreduce.lib.db.DBConfiguration importerar org.apache.hadoop.mapreduce.lib.db.DBInputFormat importerar org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importerar org.apache.hadoop .io.Text importerar org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throw Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // förarklass' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // användarnamn' root ') // lösenord Jobbjobb = nytt jobb (conf) jobb .setJarByClass (MainDbtohdfs.class) jobb.setMapperClass (Map.class) jobb.setMapOutputKeyClass (Text.class) jobb.setMapOutputValueClass (IntWritable.class) jobb.setInputFormatClass (DBInputFormat.class) FileOput. ny sökväg (args [0])) DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // ingångstabellnamn null, null, ny sträng [] {'empid', 'name', 'dept'} / / tabellkolumner) Sökväg p = ny sökväg (args [0]) FileSystem fs = FileSystem.get (ny URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denna kod ger oss möjlighet att förbereda eller konfigurera ingångsformat för att komma åt vår SQL-källkälla. Parametern inkluderar förarklassen, URL: n har adressen till SQL-databasen, dess användarnamn och lösenord.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // förarklass 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // användarnamn 'root') //Lösenord

Denna kod ger oss möjlighet att skicka informationen om tabellerna i databasen och ställa in den i jobbobjektet. Parametrarna inkluderar naturligtvis jobbinstansen, den anpassade skrivbara klassen som måste implementera DBWritable-gränssnittet, källtabellens namn, villkor om något annat är null, alla sorteringsparametrar som annars är noll, listan över tabellkolumner respektive.

DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // ingångstabellnamn null, null, ny sträng [] {'empid', 'name', 'dept'} // tabellkolumner)

Mapper

paketet com.inputFormat.copy importera java.io.IOException importera org.apache.hadoop.mapreduce.Mapper importera org.apache.hadoop.io.LongWritable importera org.apache.hadoop.io.Text importera org.apache.hadoop.io .IntWritable public class Map utvidgar Mapper {.
skyddad ogiltig karta (LongWritable-tangent, DBInputWritable-värde, Context-ctx) {försök {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (ny text (namn + '' + id + '' + avdelning), id)
} fånga (IOException e) {e.printStackTrace ()} fånga (InterruptedException e) {e.printStackTrace ()}}}

Reducer: Identitetsreducerare används

Kommando att köra:

hadoop burk dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Utgång: MySQL-tabell överförd till HDFS

hadoop dfs -ls / dbtohdfs / *

Fall 2: Överföring från en tabell i MySQL till en annan i MySQL

skapa output-tabell i MySQL

skapa tabell medarbetare1 (namn varchar (20), id int, avdelning varchar (20))

paketet com.inputFormat.copy importerar org.apache.hadoop.conf.Configuration importerar org.apache.hadoop.mapreduce.Job importerar org.apache.hadoop.mapreduce.lib.db.DBConfiguration importerar org.apache.hadoop.mapreduce.lib .db.DBInputFormat importerar org.apache.hadoop.mapreduce.lib.db.DBOutputFormat importerar org.apache.hadoop.io.Text importerar org.apache.hadoop.io.IntWritable importerar org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) throw Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // användarnamn' root ') // lösenord Jobjobb = nytt jobb (conf) job.setJarByClass (Mainonetable_to_other_table.class) jobb.setMapperClass (Map.class) jobb .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabellnamn null, null, new String [] {'empid ',' name ',' dept '} // tabellkolumner) DBOutputFormat.setOutput (jobb,' medarbetare1 ', // namn på utmatningstabell ny String [] {' name ',' id ',' dept '} // tabell kolumner) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denna kod ger oss möjlighet att konfigurera namnet på utmatningstabellen i SQL DB. Parametrarna är jobbinstans, utmatningstabellnamn respektive utdatakolonnnamn.

DBOutputFormat.setOutput (jobb, 'medarbetare1', // namn på utgångstabell ny sträng [] {'namn', 'id', 'avdelning'} // tabellkolumner)

Mapper: Samma som fall 1

Reducer:

paketet com.inputFormat.copy importerar java.io.IOException importerar org.apache.hadoop.mapreduce.Reducer importerar org.apache.hadoop.io.Text importerar org.apache.hadoop.io.IntWritable importerar org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {skyddad ogiltig reducering (Textnyckel, Iterable värden, Context ctx) {int sum = 0 Strängrad [] = key.toString (). Split ('') prova {ctx.write (ny DBOutputWritable (rad [0] .toString (), Integer.parseInt (rad [1] .toString ()), rad [2] .toString ()), NullWritable.get ())} fångst (IOException e) {e.printStackTrace ()} fånga (InterruptedException e) {e.printStackTrace ()}}}

Kommando att köra:

hadoop burk dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: överförda data från EMP-tabellen i MySQL till en annan tabellanställd1 i MySQL

Fall 3: Överföring från tabell i MySQL till NoSQL (Hbase) tabell

Skapa Hbase-tabell för att passa utdata från SQL-tabellen:

skapa 'anställd', 'officiell_info'

Förarklass:

paket Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat importera org.apache.hadoop.hbase.mapreduce.TableOutputFormat importera org.apache.hadoop.hbase.HBaseConfiguration importera org.apache.hadoop.hbase.client.HTable importera org.apache.hadoop.hbase.client.HTableInterface importera org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) kastar undantag {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // användarnamn 'root') // lösenord Jobjobb = nytt jobb (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) jobb.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('medarbetare', Reducera.klass, jobb) job.setInputFormatClass.CormFormatClass. klass) DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // ingångstabellnamn null, null, ny sträng [] {'empid', 'name', 'dept'} // tabellkolumner) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denna kod ger dig möjlighet att konfigurera utgångsnyckelklassen som i hbase är ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Här passerar vi namnet på hbase-tabellen och reduceraren för att agera på bordet.

TableMapReduceUtil.initTableReducerJob ('anställd', Reducera.klass, jobb)

Mapper:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extensions Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {försök {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ avd.))} fångst (IOException e) {e.printStackTrace ()} fångst (InterruptedException e) {e.printStackTrace ()}}}

I denna kod tar vi värden från getters i DBinputwritable-klassen och skickar dem sedan in
ImmutableBytes Skrivbar så att de når reduceringsenheten i bytewriatble-form som Hbase förstår.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Reducer:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes importerar org.apache.hadoop.io.Text public class Minska utökar TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) kastar IOException, InterruptedException {String [] orsak = null // Loop värden för (Text val: värden) {orsak = val.toString (). split ('')} // Lägg till HBase Put put = ny Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('namn'), Bytes.toBytes (orsak [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (orsak [1 ])) context.write (key, put)}}

Denna kod ger oss möjlighet att bestämma den exakta raden och kolumnen där vi skulle lagra värden från reduceringsenheten. Här lagrar vi varje empid i separat rad eftersom vi gjorde empid som radnyckel vilket skulle vara unikt. I varje rad lagrar vi den officiella informationen för de anställda under kolumnfamiljen 'official_info' under kolumnerna 'namn' respektive 'avdelning'.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (orsak [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (orsak [1])) context.write (key, put)

Överförda data i Hbase:

skanna anställd

Som vi ser har vi lyckats slutföra uppgiften att migrera vår affärsdata från en relationell SQL DB till en NoSQL DB.

I nästa blogg lär vi oss att skriva och köra koder för andra in- och utdataformat.

Fortsätt lägga upp dina kommentarer, frågor eller feedback. Jag skulle gärna höra från dig.

Har du en fråga till oss? Vänligen nämna det i kommentarfältet så återkommer vi till dig.

Relaterade inlägg: