Package com.gengoai.stream
Class StreamingContext
- java.lang.Object
-
- com.gengoai.stream.StreamingContext
-
- All Implemented Interfaces:
Serializable
,AutoCloseable
- Direct Known Subclasses:
LocalStreamingContext
,SparkStreamingContext
public abstract class StreamingContext extends Object implements Serializable, AutoCloseable
Provides methods for creating
MStreams
andMAccumulators
within a given context, i.e. local Java or distributed Spark- Author:
- David B. Bracewell
- See Also:
- Serialized Form
-
-
Constructor Summary
Constructors Constructor Description StreamingContext()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description <E> MCounterAccumulator<E>
counterAccumulator()
Creates a new Counter accumulator.abstract <E> MCounterAccumulator<E>
counterAccumulator(String name)
Creates a new Counter accumulator.static SparkStreamingContext
distributed()
Gets the distributed streaming context.MDoubleAccumulator
doubleAccumulator()
Creates a new double accumulator with initial value of 0.MDoubleAccumulator
doubleAccumulator(double initialValue)
Creates a new double accumulator with the given initial value.abstract MDoubleAccumulator
doubleAccumulator(double initialValue, String name)
Creates a new double accumulator with the given initial value.MDoubleStream
doubleStream(double... values)
Creates a MDoubleStream from a variable list of doublesabstract MDoubleStream
doubleStream(DoubleStream doubleStream)
Creates a MDoubleStream from a Java DoubleStreamabstract <T> MStream<T>
empty()
Creates a new empty streamMDoubleStream
emptyDouble()
Creates an empty MDoubleStream<K,V>
MPairStream<K,V>emptyPair()
Creates an empty MPairStreamstatic StreamingContext
get()
Gets the public streaming context as defined.static StreamingContext
get(boolean distributed)
Gets a streaming context.boolean
isDistributed()
Checks if context is a distributed context or not.<E> MAccumulator<E,List<E>>
listAccumulator()
Creates a list accumulatorabstract <E> MAccumulator<E,List<E>>
listAccumulator(String name)
Creates a list accumulatorstatic LocalStreamingContext
local()
Gets the Local streaming context.MLongAccumulator
longAccumulator()
Creates a new long accumulator with the initial value 0.MLongAccumulator
longAccumulator(long initialValue)
Creates a new long accumulator with the given initial value.abstract MLongAccumulator
longAccumulator(long initialValue, String name)
Creates a new long accumulator with the given initial value.<K,V>
MMapAccumulator<K,V>mapAccumulator()
Creates a new map accumulatorabstract <K,V>
MMapAccumulator<K,V>mapAccumulator(String name)
Creates a new map accumulator<K1,K2>
MMultiCounterAccumulator<K1,K2>multiCounterAccumulator()
Creates a new MultiCounter accumulatorabstract <K1,K2>
MMultiCounterAccumulator<K1,K2>multiCounterAccumulator(String name)
Creates a new MultiCounter accumulator<K,V>
MPairStream<K,V>pairStream(Tuple2<? extends K,? extends V>... tuples)
Creates a new pair stream from the given array of tuples.abstract <K,V>
MPairStream<K,V>pairStream(Collection<Map.Entry<? extends K,? extends V>> tuples)
Creates a new pair stream from the given collection of entries.abstract <K,V>
MPairStream<K,V>pairStream(Map<? extends K,? extends V> map)
Creates a new pair stream from the given map.abstract MStream<Integer>
range(int startInclusive, int endExclusive)
Creates a ranged based integer stream starting atstartInclusive
and ending beforeendExclusive
.<E> MAccumulator<E,Set<E>>
setAccumulator()
Creates a set accumulatorabstract <E> MAccumulator<E,Set<E>>
setAccumulator(String name)
Creates a set accumulatorMStatisticsAccumulator
statisticsAccumulator()
Creates a new statistics accumulatorabstract MStatisticsAccumulator
statisticsAccumulator(String name)
Creates a new statistics accumulatorabstract <T> MStream<T>
stream(Iterable<? extends T> iterable)
Creates a new MStream from the given iterable<T> MStream<T>
stream(Iterator<? extends T> iterator)
Creates a new MStream from the given iteratorabstract <T> MStream<T>
stream(Stream<T> stream)
Creates a new MStream from Java Stream<T> MStream<T>
stream(T... items)
Creates a stream wrapping the given items.abstract MStream<String>
textFile(@NonNull Resource location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.abstract MStream<String>
textFile(@NonNull Resource location, boolean wholeFile)
Creates a new MStream where each element is the entire content of a resource (wholeFile = true) or a single line of the resource (wholeFile = False) and resources are gathered recursively from the given location.MStream<String>
textFile(@NonNull Resource location, @NonNull String pattern)
Creates a new MStream where each element is a line in the resources (recursive) at the given location only reading files matching the given pattern.abstract MStream<String>
textFile(String location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.void
updateConfig()
Updates the config object used by this stream (important for distributed environments).-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface java.lang.AutoCloseable
close
-
-
-
-
Method Detail
-
distributed
public static SparkStreamingContext distributed()
Gets the distributed streaming context. (requires Spark jars to be on classpath).- Returns:
- the distributed streaming context
-
get
public static StreamingContext get(boolean distributed)
Gets a streaming context.- Parameters:
distributed
- Should the stream be distributed (True) or local (False)- Returns:
- the streaming context
-
get
public static StreamingContext get()
Gets the public streaming context as defined. if the config propertystreams.distributed
is set to true, the public context will be distributed otherwise it will be local.- Returns:
- the public streaming context
-
local
public static LocalStreamingContext local()
Gets the Local streaming context.- Returns:
- the local streaming context
-
counterAccumulator
public <E> MCounterAccumulator<E> counterAccumulator()
Creates a new Counter accumulator.- Type Parameters:
E
- the component type of the counter- Returns:
- the counter accumulator
-
counterAccumulator
public abstract <E> MCounterAccumulator<E> counterAccumulator(String name)
Creates a new Counter accumulator.- Type Parameters:
E
- the component type of the counter- Parameters:
name
- the name of the accumulator- Returns:
- the counter accumulator
-
doubleAccumulator
public MDoubleAccumulator doubleAccumulator()
Creates a new double accumulator with initial value of 0.- Returns:
- the double accumulator
-
doubleAccumulator
public MDoubleAccumulator doubleAccumulator(double initialValue)
Creates a new double accumulator with the given initial value.- Parameters:
initialValue
- the initial value of the accumulator- Returns:
- the double accumulator
-
doubleAccumulator
public abstract MDoubleAccumulator doubleAccumulator(double initialValue, String name)
Creates a new double accumulator with the given initial value.- Parameters:
initialValue
- the initial value of the accumulatorname
- The name of the accumulator- Returns:
- the double accumulator
-
doubleStream
public abstract MDoubleStream doubleStream(DoubleStream doubleStream)
Creates a MDoubleStream from a Java DoubleStream- Parameters:
doubleStream
- the double stream to wrap / consume- Returns:
- the MDoubleStream
-
doubleStream
public MDoubleStream doubleStream(double... values)
Creates a MDoubleStream from a variable list of doubles- Parameters:
values
- the values making up the double stream- Returns:
- the MDoubleStream
-
empty
public abstract <T> MStream<T> empty()
Creates a new empty stream- Type Parameters:
T
- the component type of the stream- Returns:
- the empty MStream
-
emptyDouble
public MDoubleStream emptyDouble()
Creates an empty MDoubleStream- Returns:
- the empty double stream
-
emptyPair
public <K,V> MPairStream<K,V> emptyPair()
Creates an empty MPairStream- Type Parameters:
K
- the key type parameterV
- the value type parameter- Returns:
- the empty pair stream
-
isDistributed
public boolean isDistributed()
Checks if context is a distributed context or not.- Returns:
- True if distributed, False if not.
-
listAccumulator
public <E> MAccumulator<E,List<E>> listAccumulator()
Creates a list accumulator- Type Parameters:
E
- the component type of the list- Returns:
- the list accumulator
-
listAccumulator
public abstract <E> MAccumulator<E,List<E>> listAccumulator(String name)
Creates a list accumulator- Type Parameters:
E
- the component type of the list- Parameters:
name
- the name of the accumulator- Returns:
- the list accumulator
-
longAccumulator
public MLongAccumulator longAccumulator(long initialValue)
Creates a new long accumulator with the given initial value.- Parameters:
initialValue
- the initial value of the accumulator- Returns:
- the long accumulator
-
longAccumulator
public MLongAccumulator longAccumulator()
Creates a new long accumulator with the initial value 0.- Returns:
- the long accumulator
-
longAccumulator
public abstract MLongAccumulator longAccumulator(long initialValue, String name)
Creates a new long accumulator with the given initial value.- Parameters:
initialValue
- the initial value of the accumulatorname
- the name of the accumulator- Returns:
- the long accumulator
-
mapAccumulator
public <K,V> MMapAccumulator<K,V> mapAccumulator()
Creates a new map accumulator- Type Parameters:
K
- the key type parameterV
- the value type parameter- Returns:
- the map accumulator
-
mapAccumulator
public abstract <K,V> MMapAccumulator<K,V> mapAccumulator(String name)
Creates a new map accumulator- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
name
- the name of the accumulator- Returns:
- the map accumulator
-
multiCounterAccumulator
public <K1,K2> MMultiCounterAccumulator<K1,K2> multiCounterAccumulator()
Creates a new MultiCounter accumulator- Type Parameters:
K1
- the first key type parameterK2
- the second key type parameter- Returns:
- the MultiCounter accumulator
-
multiCounterAccumulator
public abstract <K1,K2> MMultiCounterAccumulator<K1,K2> multiCounterAccumulator(String name)
Creates a new MultiCounter accumulator- Type Parameters:
K1
- the first key type parameterK2
- the second key type parameter- Parameters:
name
- the name of the accumulator- Returns:
- the MultiCounter accumulator
-
pairStream
public abstract <K,V> MPairStream<K,V> pairStream(Map<? extends K,? extends V> map)
Creates a new pair stream from the given map.- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
map
- the map to stream- Returns:
- the pair stream
-
pairStream
public abstract <K,V> MPairStream<K,V> pairStream(Collection<Map.Entry<? extends K,? extends V>> tuples)
Creates a new pair stream from the given collection of entries.- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
tuples
- the collection of entries to use to create the pair stream- Returns:
- the pair stream
-
pairStream
@SafeVarargs public final <K,V> MPairStream<K,V> pairStream(Tuple2<? extends K,? extends V>... tuples)
Creates a new pair stream from the given array of tuples.- Type Parameters:
K
- the key type parameterV
- the value type parameter- Parameters:
tuples
- the collection of entries to use to create the pair stream- Returns:
- the pair stream
-
range
public abstract MStream<Integer> range(int startInclusive, int endExclusive)
Creates a ranged based integer stream starting atstartInclusive
and ending beforeendExclusive
.- Parameters:
startInclusive
- the starting number in the range (inclusive)endExclusive
- the ending number in the range (exclusive)- Returns:
- the integer stream
-
setAccumulator
public <E> MAccumulator<E,Set<E>> setAccumulator()
Creates a set accumulator- Type Parameters:
E
- the component type of the set- Returns:
- the set accumulator
-
setAccumulator
public abstract <E> MAccumulator<E,Set<E>> setAccumulator(String name)
Creates a set accumulator- Type Parameters:
E
- the component type of the set- Parameters:
name
- the name of the accumulator- Returns:
- the set accumulator
-
statisticsAccumulator
public MStatisticsAccumulator statisticsAccumulator()
Creates a new statistics accumulator- Returns:
- the statistics accumulator
-
statisticsAccumulator
public abstract MStatisticsAccumulator statisticsAccumulator(String name)
Creates a new statistics accumulator- Parameters:
name
- the name of the accumulator- Returns:
- the statistics accumulator
-
stream
@SafeVarargs public final <T> MStream<T> stream(T... items)
Creates a stream wrapping the given items.- Type Parameters:
T
- the component type parameter of the stream- Parameters:
items
- the items to stream- Returns:
- the stream
-
stream
public abstract <T> MStream<T> stream(Stream<T> stream)
Creates a new MStream from Java Stream- Type Parameters:
T
- the component type parameter of the stream- Parameters:
stream
- the Java stream to wrap / consume- Returns:
- the new MStream
-
stream
public abstract <T> MStream<T> stream(Iterable<? extends T> iterable)
Creates a new MStream from the given iterable- Type Parameters:
T
- the component type parameter of the stream- Parameters:
iterable
- the iterable to wrap / consume- Returns:
- the new MStream
-
stream
public <T> MStream<T> stream(Iterator<? extends T> iterator)
Creates a new MStream from the given iterator- Type Parameters:
T
- the component type parameter of the stream- Parameters:
iterator
- the iterator to wrap / consume- Returns:
- the new MStream
-
textFile
public abstract MStream<String> textFile(String location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.- Parameters:
location
- the location to read- Returns:
- the new MStream backed by the lines of the files in the given location.
-
textFile
public abstract MStream<String> textFile(@NonNull @NonNull Resource location)
Creates a new MStream where each element is a line in the resources (recursive) at the given location.- Parameters:
location
- the location to read- Returns:
- the new MStream backed by the lines of the files in the given location.
-
textFile
public abstract MStream<String> textFile(@NonNull @NonNull Resource location, boolean wholeFile)
Creates a new MStream where each element is the entire content of a resource (wholeFile = true) or a single line of the resource (wholeFile = False) and resources are gathered recursively from the given location.
- Parameters:
location
- the locationwholeFile
- the whole file- Returns:
- the m stream
-
textFile
public MStream<String> textFile(@NonNull @NonNull Resource location, @NonNull @NonNull String pattern)
Creates a new MStream where each element is a line in the resources (recursive) at the given location only reading files matching the given pattern.- Parameters:
location
- the locationpattern
- the pattern- Returns:
- the m stream
-
updateConfig
public void updateConfig()
Updates the config object used by this stream (important for distributed environments).
-
-