# Aggregations

Jump to the [Code block](#code-block) for a combined complete example.

A common way to process the results of a [basic query](https://aerospike.com/docs/develop/client/go/usage/multi/queries/basic) is aggregation, where you compute a function over the entire results set.

## Setup

The following examples will use the setup below to illustrate query aggregation with a stream UDF.

```go
import (

    "fmt"

    "github.com/aerospike/aerospike-client-go/v6"

)

// Establishes a connection to the server

client, err := aerospike.NewClient("127.0.0.1", 3000)

if err != nil {

  log.Fatal(err)

}

defer client.Close()
```

The record structure:

```asciidoc
Occurred: Integer

Reported: Integer

Posted: Integer

Report: Map

{

    shape: List,

    summary: String,

    city: String,

    state: String,

    duration: String

}

Location: GeoJSON
```

## Stream UDF

When a query executes, it produces a stream of results. That stream contains records that you can iterate using the client API. However, Aerospike provides the ability to process the stream of results using a Stream UDF. Stream UDFs allow a stream to be augmented with operations that process the data flowing through the stream.

This example uses the Stream UDF `count`, from the `example.lua` module. See Manage UDFs for information on registering the UDF.

```lua
-- Aggregation function to count records

local function one(rec)

    return 1

end

local function add(a, b)

    return a + b

end

function count(stream)

    return stream : map(one) : reduce(add);

end
```

`count()` is applied to the stream of results from a query, adding to the stream the operations to perform on the results:

-   `map` — Maps a value from the stream to another value. In this example, mapping is defined as the function `one()`, which maps a record to the value 1.
-   `reduce` — Reduces the values from the stream into a single value. In this example, reduction is performed by adding two values from the stream, which happen to be 1s returned from the `map` function.

The end result is a stream that contains a single value, the sum of 1 for each record in the result set.

## Client UDF path

For client-side Stream UDF processing, you must point the client to the local location of the UDF module.

```go
// Set local directory

aerospike.SetLuaPath("/home/user/udf/")
```

## Execute the query

The following example will execute a [secondary index query](https://aerospike.com/docs/develop/client/go/usage/multi/queries/secondary), using an [index created](https://aerospike.com/docs/develop/client/go/usage/multi/queries/secondary#create-an-index) on the `occurred` bin. The returned result will be a count of all records with an `occurred` value between `20210101` and `20211231`.

```go
// Create statement

stmt := aerospike.NewStatement("sandbox", "ufodata")

// Create index filter

stmt.SetFilter(aerospike.NewRangeFilter("occurred", 20210101, 20211231))

// Execute the query

recordSet, err := client.QueryAggregate(nil, stmt, "example", "count")

// Get the results

for records := range recordSet.Results() {

    if records != nil {

        // Do something

        fmt.Printf("Count = %v", records.Record.Bins["SUCCESS"])

    }

}
```

## Code block

Expand this section for a single code block to apply a stream UDF aggregation

```go
import (

    "fmt"

    "github.com/aerospike/aerospike-client-go/v6"

)

func main() {

    // Establishes a connection to the server

    client, err := aerospike.NewClient("127.0.0.1", 3000)

    if err != nil {

      log.Fatal(err)

    }

    defer client.Close()

    // Set local directory

    aerospike.SetLuaPath("/home/user/udf/")

    // Create statement

    stmt := aerospike.NewStatement("sandbox", "ufodata")

    // Create index filter

    stmt.SetFilter(aerospike.NewRangeFilter("occurred", 20210101, 20211231))

    // Execute the query

    recordSet, err := client.QueryAggregate(nil, stmt, "example", "count")

    if err != nil {

      log.Fatal(err)

    }

    // Get the results

    for records := range recordSet.Results() {

        if records != nil {

            // Do something

            fmt.Printf("Count = %v", records.Record.Bins["SUCCESS"])

        }

    }

}
```