Kumulativ Stateful Transformation In Apache Spark Streaming



Det här blogginlägget diskuterar stateful transformations in Spark Streaming. Lär dig allt om kumulativ spårning och kompetens för en Hadoop Spark-karriär.

Bidrag från Prithviraj Bose

I min tidigare blogg har jag diskuterat stateful transformations med hjälp av fönsterbegreppet Apache Spark Streaming. Du kan läsa den här .





I det här inlägget ska jag diskutera kumulativa stateful operationer i Apache Spark Streaming. Om du är nybörjare i Spark Streaming rekommenderar jag starkt att du läser min tidigare blogg för att förstå hur windowsing fungerar.

Typer av Stateful Transformation i Spark Streaming (Fortsättning ...)

> Kumulativ spårning

Vi hade använt reduceraByKeyAndWindow (...) API för att spåra nycklarnas tillstånd, men fönsterfönster innebär begränsningar för vissa användningsfall. Vad händer om vi vill samla tangenternas tillstånd i stället för att begränsa det till ett tidsfönster? I så fall skulle vi behöva använda updateStateByKey (…) BRAND.



Detta API introducerades i Spark 1.3.0 och har varit mycket populärt. Detta API har dock en del prestandakostnader, dess prestanda försämras när storleken på staterna ökar med tiden. Jag har skrivit ett exempel för att visa användningen av detta API. Du hittar koden här .

Spark 1.6.0 introducerade ett nytt API mapWithState (…) som löser prestandakostnaderna från updateStateByKey (…) . I den här bloggen ska jag diskutera detta specifika API med hjälp av ett exempelprogram som jag har skrivit. Du hittar koden här .

Innan jag dyker in i en kodgenomgång, låt oss spara några ord på kontrollpunkten. För varje stateful transformation är kontrollpunkt obligatorisk. Kontrollpunkt är en mekanism för att återställa tangenternas tillstånd om förarprogrammet misslyckas. När föraren startar om återställs nycklarnas tillstånd från kontrollpunktsfilerna. Kontrollpunktsplatser är vanligtvis HDFS eller Amazon S3 eller någon pålitlig lagring. När du testar koden kan man också lagra i det lokala filsystemet.



I exempelprogrammet lyssnar vi på sockettextström på värd = localhost och port = 9999. Den tokeniserar den inkommande strömmen till (ord, antal förekomster) och spårar ordräkningen med hjälp av 1.6.0 API mapWithState (…) . Dessutom tas nycklar utan uppdateringar bort med StateSpec.timeout API. Vi kontrollerar i HDFS och kontrollpunktfrekvensen är var 20: e sekund.

Låt oss först skapa en Spark Streaming-session,

Spark-streaming-session

Vi skapar en checkpointDir i HDFS och anropa sedan objektmetoden getOrCreate (...) . De getOrCreate API kontrollerar checkpointDir för att se om det finns några tidigare tillstånd att återställa, om det existerar, återskapar det Spark Streaming-sessionen och uppdaterar tillstånden för nycklarna från de data som är lagrade i filerna innan de går vidare med nya data. Annars skapas en ny Spark Streaming-session.

De getOrCreate tar kontrollpunktens katalognamn och en funktion (som vi har namngett createFunc ) vars signatur ska vara () => StreamingContext .

Låt oss undersöka koden inuti createFunc .

Rad # 2: Vi skapar ett strömmande sammanhang med jobbnamn till “TestMapWithStateJob” och batchintervall = 5 sekunder.

Rad # 5: Ställ in kontrollpunktskatalogen.

Rad # 8: Ställ in tillståndsspecifikationen med klassen org.apache.streaming.StateSpec objekt. Vi ställer först in funktionen som ska spåra tillståndet, sedan ställer vi in ​​antalet partitioner för de resulterande DStreams som ska genereras under efterföljande transformationer. Slutligen ställer vi in ​​timeout (till 30 sekunder) där om någon uppdatering för en nyckel inte tas emot på 30 sekunder kommer nyckeltillståndet att tas bort.

hur man initierar en klass i python

Linje 12 #: Ställ in sockelströmmen, platta in inkommande batchdata, skapa ett nyckel-värdepar, ring mapWithState , ställ in kontrollpunktsintervallet till 20-talet och slutligen skriva ut resultaten.

Spark-ramverket kallar th e createFunc för varje tangent med föregående värde och nuvarande tillstånd. Vi beräknar summan och uppdaterar tillståndet med den kumulativa summan och slutligen returnerar vi summan för nyckeln.

Github Källor -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Har du en fråga till oss? Vänligen nämna det i kommentarfältet så återkommer vi till dig.

Relaterade inlägg:

Kom igång med Apache Spark & ​​Scala

Stateful Transformations with Windowing in Spark Streaming