# Developing Stream UDFs

With Stream UDFs (also called _aggregations_) in Aerospike, you can perform a sequence of operations on a set of records.

Before starting to write Stream UDFs, be familiar with the following concepts.

-   For a high-level overview of aggregations, see the [Aggregations Feature Guide](https://aerospike.com/docs/database/learn/architecture/aggregation)
-   Creating secondary index and running queries on them: [Query Guide](https://aerospike.com/docs/develop/learn/queries).
-   Creating UDF and Registering the UDF: [UDF Developer Guide](https://aerospike.com/docs/database/advanced/udf) and [Managing UDFs](https://aerospike.com/docs/database/advanced/udf/managing).
-   API reference for LUA stream-ops module: [API Stream](https://aerospike.com/docs/database/advanced/udf/api/types/stream).

Stream UDFs have the following types of operations. You have to write one or more of these functions depending on your application’s requirements.

-   filter — retain data in the stream that satisfies a predicate.
-   map — transform a piece of data into.
-   aggregate — aggregates a stream of data into a single aggregate value.
-   reduce — reduce produces a value by combining two like values.

## Basics

The main difference between a Record UDF and a Stream UDF is that the Record UDF returns a value, whereas the Stream UDF returns a stream definition. The stream definition contains a series of functions that should be applied on the data that is fed to the stream.

**The stream-UDF is read-only**.

If an attempt is made to update a record by any function from [Aerospike lua module](https://aerospike.com/docs/database/advanced/udf/api/modules/aerospike) in the stream definition, the function will fail. A stream UDF is like the pipe mechanism in Linux where the output of one command can be fed into the next command in a chain, as in the example below:

```plaintext
cat peoplelist.txt | grep "USA" | grep "California" | wc -l
```

However, in case of stream UDF, the user has to indicate the type of function because each type of operation is treated differently by the stream sub-system. Each function has an input stream and an output stream. Data is fed to the function from the input stream and the result will be put on its output stream. The output stream of a function becomes an input stream to the next function in the chain. A typical stream UDF looks as below:

```lua
function my_stream_udf(s)

    local m = map()

    return s : filter(my_filter_fn) : map(my_map_fn): aggregate(m, my_aggregate_fn): reduce(my_reduce_fn)

end
```

It is legal to specify multiple functions of any type in any order as long as they can work together. For example, the example below is a legal stream.

```lua
function my_complex_stream_udf(s)

    return s : filter(my_filter1) : map(my_map1) : filter(my_filter2) : map(my_map2) : map(my_map3) : reduce(my_reduce)

end
```

The stream UDFs execute in two phases.

-   Cluster-side : First the stream UDF is executed on all the nodes of the cluster. The result from each node (after applying first reduce) is sent to the application which triggered the UDF.
-   Client-side: When the nodes send their result to the application, the client layer will do the final aggregation. Execution at client-side will start from first reduce function and send the result to application.

 ![Stream UDF](https://aerospike.com/docs/_astro/stream.BYtb6yoH_Zi5EPe.png)

The UDF file should be present both on the cluster nodes as well as the client nodes as the final phase of reduction happens on the client side.

Although the presentation of Stream UDFs often includes the standard `filter() | map() | reduce()` arrangement, in fact any combination of these functions can be used.

## The Filter Operation

The **filter operation** filters values from the stream. The filter operation accepts a single argument, the filter function. For example, return s : filter(my\_filter1).

The **filter function** accepts the current value from the stream and should return true or false, where true indicates the value should continue down the stream.

A typical filter function is shown below. A query feed records in to the stream. The filter operation applies the filter function to each record in the stream. In the example, the filter function passes records containing “males” older than “18” years down the stream.

```lua
local function my_filter_fn(rec)

    if rec['age'] > 18 and rec['sex'] == 'male' then

        return true

    else

        return false

    end

end
```

## The Map Operation

The **map operation** will transform values in the stream. The map operation accepts a single argument, the map function. For example, return s : map(my\_map1).

The **map function** accepts the current value from the stream, and should return a new value to replace the current value in the stream. The type of the return value must be one of those supported by the database: integer, string, list, and map.

A typical map function looks as below:

```lua
local function my_map_fn(rec)

    local result = map()

    result['name'] = rec['name']

    result['city'] = rec['city']

    return result

end
```

## The Aggregate Operation

Aggregate operation aggregates a stream of data into a single aggregate value. The **aggregate operation** accepts two arguments and returns one value. The first argument is the aggregated value where the results of the aggregation are stored. The second argument is an aggregation function which will aggregate the values coming from the input stream and store in the aggregated value (the first argument).

The **aggregation function** takes two arguments, the aggregate value and the next value from the input stream. It should return a single value whose type must be one of those supported by the database: integer, string, list, and map. The aggregate value and the return value should be of same type. Each call to the aggregate function should populate the aggregate value with data contained in the next value from the input stream.

A typical aggregate function looks as below. In the example, it is trying to do a group-by kind of operation where it is trying to find the number of citizens in each city. You will notice reduce is called after aggregate, which will perform a final reduction on the data

```lua
local function my_aggregation_fn(aggregate, nextitem)

    -- If the count for the city does not exist, initialize it with 1

    -- Else increment the existing counter for that city

    aggregate[nextitem['city']] = (aggregate[nextitem['city']] or 0) + 1

    return aggregate

end
```

On close observation one can realize that the aggregate function is different from filter and map type of functions. Both filter and map takes one element as from its input stream and returns one element to its output stream. Whereas, the aggregate function will take a bunch of elements from its input stream (across multiple invocations) but will return only one element to its output stream. So, putting two aggregate functions in a row is not of much benefit because the output of the first aggregate function will only emit single element which can be consumed by the next aggregate function.

The accumulated aggregate value can grow quite large. It is possible, for example, to simulate the SQL Select DISTINCT function by accumulating values in a map and then dumping the map at the end.

## The Reduce Operation

The **reduce operation** reduces values in the stream to a single value. The reduce operation accepts a single argument, the reduce function.

The **reduce function** accepts two values from the stream and returns a single value to be fed back into the function as the first argument. The reduce function should be commutative. The two arguments of the reduce function and the return value should be of the same type. The type of the return value must be one of those supported by the database: integer, string, list, and map.

One main characteristic of reduce function is that it executes both on the server nodes as well as the client side (in application instance). Each node first runs the data stream through the functions defined in the stream definition. The end result of this is sent to the application instance. The application gets results from all the nodes in the cluster. The client layer in it does the final reduce using the reduce function specified in the stream. So, the reduce function should be able to aggregate the intermediate aggregated values (coming form the cluster nodes). If there is no reduce function, the client layer simply passes all the data coming from the nodes to the application.

```lua
local function my_merge_fn(val1, val2)

    return val1 + val2

end

local function my_reduce_fn(global_agg_value, next_agg_value)

    -- map.merge is a library function will call the my_merge_fn for each key and returns a new map.

    -- my_merge_fn will be passed the values of the key in two maps as arguments.

    -- It stores the result of merge function against the key in the result map

    return map.merge(global_agg_value, next_agg_value, my_merge_fn)

end
```

### Data Types

In case of stream UDFs, there will be a series of functions and the output of one function goes as input of the next function. So, there should be an agreement between a function and the next function in the chain. The data type returned by a function should be of a type that can be consumed by the next function in the chain. The developer has to take care of this. If this agreement is broken, the stream UDF execution may fail due to exception.

The data that is fed to the first function in the stream will be of type ‘record’. It should be capable of consuming it. To see how to consume a ‘record’ refer to the [UDF guide](https://aerospike.com/docs/database/advanced/udf).

More on the data types of functions in section below.

## Extra Arguments Using Closures

The examples above uses hard-coded values for filtering and transformation. It will not be practical to write a whole lot of filter/map/reduce function for each specific use case, for example, one filter function for `age > 18` and one more for `age > 25`. One will want to write a generic function and use it in different ways by passing arguments to it. To pass arguments to these filter/map/reduce type of functions we should use the concept of a closure. In short, closure is a function which will capture the environment in which it is created.

Example:

```lua
local function my_age_filter(minimum_age)

    return function(rec)

        if rec['age'] > minimum_age

            return true

        else

            return false

    end

end

function my_stream_udf(stream, minimum_age)

    local myfilter = my_age_filter(minimum_age)

    return stream : filter(myfilter) : aggregate(0, myreducer)

end
```

The arguments can be passed when invoking the stream UDF as detailed in the [UDF guide](https://aerospike.com/docs/database/advanced/udf).

## Arguments

A Stream UDF should have one or more arguments.

The first argument must always be a Lua: stream\_ops Module, which defines the operations to be invoked on the data in a stream.

Each subsequent argument is defined by the UDF and must be one of the types supported by the database: integer, string, list and map.

## Return Values

The return value must be the Lua: stream\_ops Module, optionally augmented with operations.

## Additional Examples

For more examples, see [Statistics](https://aerospike.com/docs/database/advanced/udf/modules/stream/examples/statistics), [Word count](https://aerospike.com/docs/database/advanced/udf/modules/stream/examples/word-count), and [String starts with](https://aerospike.com/docs/database/advanced/udf/modules/stream/examples/starts-with) examples.