Package com.gengoai.stream.spark
Class SparkStreamingContext
- java.lang.Object
-
- com.gengoai.stream.StreamingContext
-
- com.gengoai.stream.spark.SparkStreamingContext
-
- All Implemented Interfaces:
Serializable
,AutoCloseable
public final class SparkStreamingContext extends StreamingContext
Represents a distributed streaming context using Sparks's rdd classes- Author:
- David B. Bracewell
- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description static org.apache.spark.api.java.JavaSparkContext
context
static SparkStreamingContext
INSTANCE
The singleton instance of the contextstatic String
SPARK_APPNAME
The config property name containing the spark application namestatic String
SPARK_MASTER
The config property name specifying the spark master addressstatic org.apache.spark.sql.SparkSession
sparkSession
-
Constructor Summary
Constructors Constructor Description SparkStreamingContext()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description <T> org.apache.spark.broadcast.Broadcast<T>
broadcast(T object)
Broadcasts an object using Spark's broadcast functionality.void
close()
static SparkStreamingContext
contextOf(com.gengoai.stream.spark.SparkDoubleStream stream)
Gets the streaming context of a given spark streamstatic SparkStreamingContext
contextOf(SparkPairStream<?,?> stream)
Gets the streaming context of a given spark streamstatic SparkStreamingContext
contextOf(SparkStream<?> stream)
Gets the streaming context of a given spark stream<E> MCounterAccumulator<E>
counterAccumulator(String name)
Creates a new Counter accumulator.MDoubleAccumulator
doubleAccumulator(double initialValue, String name)
Creates a new double accumulator with the given initial value.com.gengoai.stream.spark.SparkDoubleStream
doubleStream(DoubleStream doubleStream)
Creates a MDoubleStream from a Java DoubleStream<T> SparkStream<T>
empty()
Creates a new empty streamorg.apache.spark.broadcast.Broadcast<Config>
getConfigBroadcast()
Gets the broadcasted version of the Config objectboolean
isDistributed()
Checks if context is a distributed context or not.<E> MAccumulator<E,List<E>>
listAccumulator(String name)
Creates a list accumulatorMLongAccumulator
longAccumulator(long initialValue, String name)
Creates a new long accumulator with the given initial value.<K,V>
MMapAccumulator<K,V>mapAccumulator(String name)
Creates a new map accumulator<K1,K2>
MMultiCounterAccumulator<K1,K2>multiCounterAccumulator(String name)
Creates a new MultiCounter accumulator<K,V>
SparkPairStream<K,V>pairStream(Collection<Map.Entry<? extends K,? extends V>> tuples)
Creates a new pair stream from the given collection of entries.<K,V>
SparkPairStream<K,V>pairStream(Map<? extends K,? extends V> map)
Creates a new pair stream from the given map.SparkStream<Integer>
range(int startInclusive, int endExclusive)
Creates a ranged based integer stream starting atstartInclusive
and ending beforeendExclusive
.<E> MAccumulator<E,Set<E>>
setAccumulator(String name)
Creates a set accumulatororg.apache.spark.api.java.JavaSparkContext
sparkContext()
Gets the wrapped Spark contextorg.apache.spark.sql.SparkSession
sparkSession()
MStatisticsAccumulator
statisticsAccumulator(String name)
Creates a new statistics accumulator<T> SparkStream<T>
stream(Iterable<? extends T> iterable)
Creates a new MStream from the given iterable<T> SparkStream<T>
stream(Stream<T> stream)
Creates a new MStream from Java StreamSparkStream<String>
textFile(Resource location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.MStream<String>
textFile(Resource location, boolean wholeFile)
Creates a new MStream where each element is the entire content of a resource (wholeFile = true) or a single line of the resource (wholeFile = False) and resources are gathered recursively from the given location.SparkStream<String>
textFile(String location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.void
updateConfig()
Updates the config object used by this stream (important for distributed environments).-
Methods inherited from class com.gengoai.stream.StreamingContext
counterAccumulator, distributed, doubleAccumulator, doubleAccumulator, doubleStream, emptyDouble, emptyPair, get, get, listAccumulator, local, longAccumulator, longAccumulator, mapAccumulator, multiCounterAccumulator, pairStream, setAccumulator, statisticsAccumulator, stream, stream, textFile
-
-
-
-
Field Detail
-
INSTANCE
public static final SparkStreamingContext INSTANCE
The singleton instance of the context
-
SPARK_APPNAME
public static final String SPARK_APPNAME
The config property name containing the spark application name- See Also:
- Constant Field Values
-
SPARK_MASTER
public static final String SPARK_MASTER
The config property name specifying the spark master address- See Also:
- Constant Field Values
-
context
public static volatile org.apache.spark.api.java.JavaSparkContext context
-
sparkSession
public static volatile org.apache.spark.sql.SparkSession sparkSession
-
-
Method Detail
-
contextOf
public static SparkStreamingContext contextOf(SparkStream<?> stream)
Gets the streaming context of a given spark stream- Parameters:
stream
- the stream whose context we want- Returns:
- the spark streaming context
-
contextOf
public static SparkStreamingContext contextOf(com.gengoai.stream.spark.SparkDoubleStream stream)
Gets the streaming context of a given spark stream- Parameters:
stream
- the stream whose context we want- Returns:
- the spark streaming context
-
contextOf
public static SparkStreamingContext contextOf(SparkPairStream<?,?> stream)
Gets the streaming context of a given spark stream- Parameters:
stream
- the stream whose context we want- Returns:
- the spark streaming context
-
broadcast
public <T> org.apache.spark.broadcast.Broadcast<T> broadcast(T object)
Broadcasts an object using Spark's broadcast functionality.- Type Parameters:
T
- the type of the object being broadcasted- Parameters:
object
- the object to broadcast- Returns:
- the broadcast wrapper around the object
-
close
public void close()
-
counterAccumulator
public <E> MCounterAccumulator<E> counterAccumulator(String name)
Description copied from class:StreamingContext
Creates a new Counter accumulator.- Specified by:
counterAccumulator
in classStreamingContext
- Type Parameters:
E
- the component type of the counter- Parameters:
name
- the name of the accumulator- Returns:
- the counter accumulator
-
doubleAccumulator
public MDoubleAccumulator doubleAccumulator(double initialValue, String name)
Description copied from class:StreamingContext
Creates a new double accumulator with the given initial value.- Specified by:
doubleAccumulator
in classStreamingContext
- Parameters:
initialValue
- the initial value of the accumulatorname
- The name of the accumulator- Returns:
- the double accumulator
-
doubleStream
public com.gengoai.stream.spark.SparkDoubleStream doubleStream(DoubleStream doubleStream)
Description copied from class:StreamingContext
Creates a MDoubleStream from a Java DoubleStream- Specified by:
doubleStream
in classStreamingContext
- Parameters:
doubleStream
- the double stream to wrap / consume- Returns:
- the MDoubleStream
-
empty
public <T> SparkStream<T> empty()
Description copied from class:StreamingContext
Creates a new empty stream- Specified by:
empty
in classStreamingContext
- Type Parameters:
T
- the component type of the stream- Returns:
- the empty MStream
-
getConfigBroadcast
public org.apache.spark.broadcast.Broadcast<Config> getConfigBroadcast()
Gets the broadcasted version of the Config object- Returns:
- the config broadcast
-
isDistributed
public boolean isDistributed()
Description copied from class:StreamingContext
Checks if context is a distributed context or not.- Overrides:
isDistributed
in classStreamingContext
- Returns:
- True if distributed, False if not.
-
listAccumulator
public <E> MAccumulator<E,List<E>> listAccumulator(String name)
Description copied from class:StreamingContext
Creates a list accumulator- Specified by:
listAccumulator
in classStreamingContext
- Type Parameters:
E
- the component type of the list- Parameters:
name
- the name of the accumulator- Returns:
- the list accumulator
-
longAccumulator
public MLongAccumulator longAccumulator(long initialValue, String name)
Description copied from class:StreamingContext
Creates a new long accumulator with the given initial value.- Specified by:
longAccumulator
in classStreamingContext
- Parameters:
initialValue
- the initial value of the accumulatorname
- the name of the accumulator- Returns:
- the long accumulator
-
mapAccumulator
public <K,V> MMapAccumulator<K,V> mapAccumulator(String name)
Description copied from class:StreamingContext
Creates a new map accumulator- Specified by:
mapAccumulator
in classStreamingContext
- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
name
- the name of the accumulator- Returns:
- the map accumulator
-
multiCounterAccumulator
public <K1,K2> MMultiCounterAccumulator<K1,K2> multiCounterAccumulator(String name)
Description copied from class:StreamingContext
Creates a new MultiCounter accumulator- Specified by:
multiCounterAccumulator
in classStreamingContext
- Type Parameters:
K1
- the first key type parameterK2
- the second key type parameter- Parameters:
name
- the name of the accumulator- Returns:
- the MultiCounter accumulator
-
pairStream
public <K,V> SparkPairStream<K,V> pairStream(Map<? extends K,? extends V> map)
Description copied from class:StreamingContext
Creates a new pair stream from the given map.- Specified by:
pairStream
in classStreamingContext
- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
map
- the map to stream- Returns:
- the pair stream
-
pairStream
public <K,V> SparkPairStream<K,V> pairStream(Collection<Map.Entry<? extends K,? extends V>> tuples)
Description copied from class:StreamingContext
Creates a new pair stream from the given collection of entries.- Specified by:
pairStream
in classStreamingContext
- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
tuples
- the collection of entries to use to create the pair stream- Returns:
- the pair stream
-
range
public SparkStream<Integer> range(int startInclusive, int endExclusive)
Description copied from class:StreamingContext
Creates a ranged based integer stream starting atstartInclusive
and ending beforeendExclusive
.- Specified by:
range
in classStreamingContext
- Parameters:
startInclusive
- the starting number in the range (inclusive)endExclusive
- the ending number in the range (exclusive)- Returns:
- the integer stream
-
setAccumulator
public <E> MAccumulator<E,Set<E>> setAccumulator(String name)
Description copied from class:StreamingContext
Creates a set accumulator- Specified by:
setAccumulator
in classStreamingContext
- Type Parameters:
E
- the component type of the set- Parameters:
name
- the name of the accumulator- Returns:
- the set accumulator
-
sparkContext
public org.apache.spark.api.java.JavaSparkContext sparkContext()
Gets the wrapped Spark context- Returns:
- the java spark context
-
sparkSession
public org.apache.spark.sql.SparkSession sparkSession()
-
statisticsAccumulator
public MStatisticsAccumulator statisticsAccumulator(String name)
Description copied from class:StreamingContext
Creates a new statistics accumulator- Specified by:
statisticsAccumulator
in classStreamingContext
- Parameters:
name
- the name of the accumulator- Returns:
- the statistics accumulator
-
stream
public <T> SparkStream<T> stream(Stream<T> stream)
Description copied from class:StreamingContext
Creates a new MStream from Java Stream- Specified by:
stream
in classStreamingContext
- Type Parameters:
T
- the component type parameter of the stream- Parameters:
stream
- the Java stream to wrap / consume- Returns:
- the new MStream
-
stream
public <T> SparkStream<T> stream(Iterable<? extends T> iterable)
Description copied from class:StreamingContext
Creates a new MStream from the given iterable- Specified by:
stream
in classStreamingContext
- Type Parameters:
T
- the component type parameter of the stream- Parameters:
iterable
- the iterable to wrap / consume- Returns:
- the new MStream
-
textFile
public SparkStream<String> textFile(String location)
Description copied from class:StreamingContext
Creates a new MStream where each element is a line in the resources (recursive) at the given location.- Specified by:
textFile
in classStreamingContext
- Parameters:
location
- the location to read- Returns:
- the new MStream backed by the lines of the files in the given location.
-
textFile
public SparkStream<String> textFile(Resource location)
Description copied from class:StreamingContext
Creates a new MStream where each element is a line in the resources (recursive) at the given location.- Specified by:
textFile
in classStreamingContext
- Parameters:
location
- the location to read- Returns:
- the new MStream backed by the lines of the files in the given location.
-
updateConfig
public void updateConfig()
Description copied from class:StreamingContext
Updates the config object used by this stream (important for distributed environments).- Overrides:
updateConfig
in classStreamingContext
-
textFile
public MStream<String> textFile(Resource location, boolean wholeFile)
Description copied from class:StreamingContext
Creates a new MStream where each element is the entire content of a resource (wholeFile = true) or a single line of the resource (wholeFile = False) and resources are gathered recursively from the given location.
- Specified by:
textFile
in classStreamingContext
- Parameters:
location
- the locationwholeFile
- the whole file- Returns:
- the m stream
-
-