RDD med Spark: Byggstenen för Apache Spark



Den här bloggen om RDD med Spark ger dig en detaljerad och omfattande kunskap om RDD, som är den grundläggande enheten för Spark och hur användbar den är.

, Själva ordet räcker för att skapa en gnista i varje Hadoop-ingenjörs sinne. TILL n i minnet bearbetningsverktyg vilket är blixtsnabbt i klusterbearbetning. Jämfört med MapReduce gör datadelningen i minnet RDD 10-100x snabbare än nätverks- och diskdelning och allt detta är möjligt på grund av RDD (Resilient Distribuerade datamängder). De viktigaste punkterna vi fokuserar idag i denna RDD med hjälp av Spark-artikeln är:

Behöver du RDD?

Varför behöver vi RDD? -RDD med Spark





Världen utvecklas med och Datavetenskap på grund av framsteget i . Algoritmer baserat på Regression , , och som körs på Distribuerad Iterativ beräkning attion mode som inkluderar återanvändning och delning av data mellan flera datorenheter.

Det traditionella tekniker behövde en stabil mellanliggande och distribuerad lagring som HDFS innefattande repetitiva beräkningar med datareplikeringar och dataserialisering, vilket gjorde processen mycket långsammare. Att hitta en lösning var aldrig lätt.



Det är här RDD (Resilient Distributed Datasets) kommer till helheten.

RDD s är enkla att använda och enkla att skapa eftersom data importeras från datakällor och släpps in i RDD. Vidare tillämpas operationerna för att bearbeta dem. De är en distribuerad minnessamling med behörigheter som Skrivskyddad och viktigast av allt är de Feltolerant .



Om någon datapartition av RDD är förlorat kan den regenereras genom att använda samma omvandling operation på den förlorade partitionen i härstamning , snarare än att bearbeta all data från grunden. Denna typ av tillvägagångssätt i realtidsscenarier kan göra mirakel i situationer med dataförlust eller när ett system är nere.

Vad är RDD?

RDD eller ( Resilient Distribuerad datamängd ) är en grundläggande datastruktur i Spark. Termen Uthållig definierar förmågan som genererar data automatiskt eller data rullar tillbaka till ursprungligt tillstånd när en oväntad katastrof inträffar med en sannolikhet för dataförlust.

kock vs ansible vs marionett

Uppgifterna skrivna i RDD är partitionerad och lagras i flera körbara noder . Om en exekverande nod misslyckas i körtiden, då får det omedelbart tillbaka upp från nästa körbara nod . Det är därför RDD betraktas som en avancerad typ av datastrukturer jämfört med andra traditionella datastrukturer. RDD kan lagra strukturerad, ostrukturerad och halvstrukturerad data.

Låt oss gå vidare med vår RDD med hjälp av Spark-bloggen och lära oss om de unika funktionerna i RDD som ger den en fördel jämfört med andra typer av datastrukturer.

Funktioner i RDD

  • I minne (BAGGE) Beräkningar : Begreppet In-Memory-beräkning tar databearbetningen till ett snabbare och effektivt steg där det totala prestanda av systemet är uppgraderad.
  • L hans utvärdering : Termen Lazy utvärdering säger omvandlingar tillämpas på data i RDD, men utdata genereras inte. Istället är de tillämpade transformationerna loggad.
  • Uthållighet : De resulterande RDD: erna är alltid återanvändbar.
  • Grovkornig verksamhet : Användaren kan tillämpa omvandlingar till alla element i datamängder genom Karta, filtrera eller Grupp av operationer.
  • Feltolerant : Om det går förlorat på data kan systemet göra det rulla tillbaka till dess ursprungligt tillstånd genom att använda den loggade omvandlingar .
  • Oföränderlighet : Data som definierats, hämtats eller skapats kan inte göras ändrats när den är inloggad i systemet. Om du behöver komma åt och ändra befintlig RDD måste du skapa en ny RDD genom att använda en uppsättning Omvandling fungerar på den aktuella eller föregående RDD.
  • Partitionering : Det är avgörande enhet av parallellism i Spark RDD. Som standard är antalet skapade partitioner baserat på din datakälla. Du kan till och med bestämma antalet partitioner du vill göra med anpassad partition funktioner.

Skapande av RDD med Spark

RDD kan skapas i tre sätt:

  1. Läser data från parallelliserade samlingar
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Ansöker omvandling på tidigare RDD
valord = spark.sparkContext.parallelize (Seq ('Spark', 'är', 'a', 'mycket', 'kraftfull', 'språk')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Läser data från extern lagring eller filvägar som HDFS eller HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Åtgärder utförda på RDD:

Det finns huvudsakligen två typer av operationer som utförs på RDD, nämligen:

  • Transformationer
  • Åtgärder

Transformationer : De operationer vi ansöker om RDD till filter, åtkomst och ändra data i överordnad RDD för att generera en på varandra följande RDD kallas omvandling . Den nya RDD returnerar en pekare till den tidigare RDD som säkerställer beroendet mellan dem.

Transformationer är Lata utvärderingar, med andra ord, de operationer som tillämpas på RDD som du arbetar loggas men inte avrättade. Systemet ger ett resultat eller undantag efter att ha utlöst Handling .

Vi kan dela upp transformationer i två typer enligt nedan:

  • Smala transformationer
  • Breda omvandlingar

Smala transformationer Vi tillämpar smala omvandlingar till a enda partition för den överordnade RDD för att generera en ny RDD eftersom data som krävs för att behandla RDD är tillgängliga på en enda partition av förälder ASD . Exemplen för smala transformationer är:

  • Karta()
  • filtrera()
  • flatMap ()
  • dela()
  • mapPartitions()

Breda omvandlingar: Vi tillämpar den breda omvandlingen på flera partitioner för att generera en ny RDD. Data som krävs för att bearbeta RDD finns på flera partitioner i förälder ASD . Exemplen för breda transformationer är:

  • minskaBy ()
  • union()

Åtgärder : Åtgärder instruerar Apache Spark att ansöka beräkning och skicka resultatet eller ett undantag tillbaka till förarens RDD. Få av åtgärderna inkluderar:

  • samla()
  • räkna()
  • ta()
  • först()

Låt oss praktiskt taget använda operationerna på RDD:

IPL (Indian Premier League) är en cricket-turnering med den högsta nivån. Så kan vi idag ta hand om IPL-datauppsättningen och köra vår RDD med Spark.

  • För det första, låt oss ladda ner en CSV-matchningsdata för IPL. Efter att ha laddat ner det börjar det se ut som en EXCEL-fil med rader och kolumner.

I nästa steg avfyrar vi gnistan och laddar matches.csv-filen från dess plats, i mitt fall mincsvfilplats är “/User/edureka_566977/test/matches.csv”

Låt oss nu börja med Omvandling del först:

  • Karta():

Vi använder Kartomvandling för att tillämpa en specifik transformationsoperation på alla element i en RDD. Här skapar vi en RDD med namnet CKfile där vi lagrar vårcsvfil. Vi ska skapa en annan RDD som kallas stater till lagra stadsdetaljer .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtrera():

Filtertransformation, själva namnet beskriver dess användning. Vi använder denna omvandlingsåtgärd för att filtrera bort selektiva data från en given datainsamling. Vi ansöker filterdrift här för att få register över årets IPL-matchningar 2017 och förvara den i RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Vi tillämpar flatMap är en transformationsoperation till var och en av elementen i en RDD för att skapa en newRDD. Det liknar Map-transformation. här gäller viFlatkartatill spotta ut matcherna i Hyderabad stad och lagra data ifilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). samla ()

  • dela():

Varje data vi skriver in i en RDD är uppdelad i ett visst antal partitioner. Vi använder denna omvandling för att hitta antal partitioner uppgifterna delas faktiskt upp i.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions():

Vi betraktar MapPatitions som ett alternativ till Map () ochför varje() tillsammans. Vi använder mapPartitions här för att hitta antal rader vi har i vår fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • minskaBy ():

Vi använderMinska av() på Nyckel-värdepar . Vi använde denna omvandling på vårcsvfil för att hitta spelaren med matchens högsta man .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .karta (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • union():

Namnet förklarar allt, Vi använder facklig transformation är att klubb två RDD tillsammans . Här skapar vi två RDD: er fil och fil2. fil RDD innehåller posterna för 2017 IPL-matchningar och fil2 RDD innehåller 2016 IPL-matchningsrekord.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Låt oss börja med Handling del där vi visar faktisk produktion:

  • samla():

Samla är den åtgärd som vi brukar göra visa innehållet i RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • räkna():

Räknaär en åtgärd som vi använder för att räkna antal poster närvarande i RDD.Härvi använder den här åtgärden för att räkna det totala antalet poster i vår matches.csv-fil.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • ta():

Take är en åtgärd som liknar insamling men den enda skillnaden är att den kan skriva ut någon selektivt antal rader enligt användarförfrågan. Här tillämpar vi följande kod för att skriva ut topp tio ledande rapporter.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ta (10) .foreach (println)

  • först():

First () är en åtgärdsoperation som liknar collect () och take ()denanvänds för att skriva ut den översta rapporten s utdata Här använder vi den första () operationen för att hitta maximalt antal matcher som spelas i en viss stad och vi får Mumbai som utgång.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

För att göra vår process till vår inlärning av RDD med Spark, ännu mer intressant, har jag kommit med ett intressant användningsfall.

RDD med Spark: Pokemon Use Case

  • För det första, Låt oss ladda ner en Pokemon.csv-fil och ladda den till gnistskalet som vi gjorde för Matches.csv-filen.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons finns faktiskt i en stor variation, låt oss hitta några varianter.

  • Tar bort schema från filen Pokemon.csv

Vi kanske inte behöver Schema av filen Pokemon.csv. Därför tar vi bort det.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Hitta antalet partitioner vår pokemon.csv distribueras i.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vatten Pokémon

Hitta antal vattenpokémon

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Fire Pokémon

Hitta antal Fire Pokémon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Vi kan också upptäcka befolkning av en annan typ av pokemon som använder räknarfunktionen
WaterRDD.count () FireRDD.count ()

  • Eftersom jag gillar spelet defensiv strategi låt oss hitta pokemon med maximalt försvar.
val defenceList = NoHeader.map {x => x.split (',')}. karta {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Vi vet det maximala värde för försvar styrka men vi vet inte vilken pokemon det är. så, låt oss hitta vilken som är det pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. karta {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Beställer [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Låt oss nu reda ut pokemon med minst försvar
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Låt oss nu se Pokémon med en mindre defensiv strategi.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPeader .karta {x => x.split (',')}. karta {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Beställning [Dubbel ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Så med detta kommer vi till ett slut på denna RDD med hjälp av Spark-artikeln. Jag hoppas att vi fick lite ljus över din kunskap om RDD, deras funktioner och de olika typer av operationer som kan utföras på dem.

Denna artikel baseras på är utformad för att förbereda dig för Cloudera Hadoop och Spark Developer Certification Exam (CCA175). Du får en fördjupad kunskap om Apache Spark och Spark Ecosystem, som inkluderar Spark RDD, Spark SQL, Spark MLlib och Spark Streaming. Du får omfattande kunskaper om Scala-programmeringsspråk, HDFS, Sqoop, Flume, Spark GraphX ​​och Messaging System som Kafka.