We are excited to be a part of AWS re:Invent 2024. Visit us at booth #1844 in Las Vegas.More info
Blog

Using AWS IAM for client authentication

Untitled
Naresh Maharaj
Consultant: Professional Services
December 20, 2023|10 min read

In this blog post, we detail how to create an Amazon Managed Streaming for Apache Kafka (Amazon MSK) resource using AWS Identity and Access Management (AWS IAM) in roles and policies to authenticate user access. In the initial step, we establish an Aerospike Database cluster and insert sample messages into the database. Subsequently, we observe in real time how these messages are streamed to Amazon MSK using Aerospike's Kafka Source Connector. Below we provide a comprehensive, step-by-step guide for users to successfully implement this process.

img 2 msk

AWS MKS Kafka

In this section, you will set up a simple three-node Kafka cluster.

  1. Visit the AWS console and select MSK service.

msk-img

2. Create a new cluster by selecting Create ClusterQuick Create.

cluster-creation

3. Select the provisioned cluster and instance type of kafka.t3.small.

cluster-properties

4. Select the EBS storage type per broker of 10 GB.

ebs-broker

NOTE: Take note of the VPC, subnets, and security group ID, as you will require these details later in the article.

The next step is the critical step where you will create the AWS IAM policy and roles. This setup ensures that the Aerospike Database authenticates using AWS IAM to write data to MSK.

5. From the AWS Console, select the AWS IAM service.

iam-policies

6. To create a new AWS IAM policy, copy the following JSON and paste it in the JSON tab. Replace region:Account-ID with your own region and AWS account ID.

specify-permissions

7. Save the policy and name it msk-tutorial-policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:region:Account-ID:cluster/MSKTutorialCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:region:Account-ID:topic/MSKTutorialCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:region:Account-ID:group/MSKTutorialCluster/*"
            ]
        }
    ]
}

8. Create the IAM role.

iam-roles

9. Under Common Use Cases, select EC2 and then Next.

use-case

10. Under Permissions, select the policy named msk-tutorial-policy and then Next.

msk-permissions

11. Give the role a name like msk-tutorial-role and click the Create Role button.

Kafka client machine

Next, create a client machine to install the Kafka tools necessary to access our MSK cluster.

  1. Create a new ec2 instance using type

    t2.micro
instance-type

2. Use the default AMI: Amazon Linux 2023

ami-info

The AMI may be different depending on your region

3. Create a key-pair if required. I am using an already existing key-pair.

key-pair

4. Under Advanced Options.IAM instance profile, select the IAM role created earlier.

advanced-details

5. Launch the instance!

6. Under instances launched, choose the instance you just created. Click on the ‘Security’ tab and note the security group associated with this instance. e.g., sg-0914e6271c97ae4c9 (launch-wizard-1)

7. Navigate to the VPC section and select Security Groups from the left-hand menu. Locate the security group associated with the MSK cluster, such as sg-e5f51dfb, and choose Edit Inbound Rules.

8. Create a new rule to allow all traffic from the new ec2 instance.

security-group-rule

Kafka topics

After successfully establishing your initial Kafka cluster and Kafka client machine, proceed to conduct testing. Verify the functionality by accessing the MSK cluster, creating a topic, producing and consuming sample messages, and ensuring that everything operates as anticipated.

  1. From the MSK Cluster, note the Kafka version being used. This examples uses 2.8.1.

  2. From the Kafka client machine, install Java 11+.

sudo yum -y install java-11

3. Download Apache Kafka using wget, then extract the archive using tar.

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz

4. To use IAM, you will need the MSK IAM Auth jar file. Download the jar to the Kafka libs folder you just extracted.

cd kafka_2.12-2.8.1/libs/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
cd ../bin/
5. Create a file called client.properties to use when authenticating to MSK. It will define the SASL mechanism to use and reference the Java class file that will handle your IAM callbacks.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

Creating topics

  1. Go to the AWS Console and view the MSK Cluster Client Information. There will be three endpoints to choose from, but you only require one.

Example choose:

B-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098
bootstrap-server

From the kafka/bin folder, run the command to create a topic. Let's call it aerospike-airforce-1.

export BootstrapServerString="b-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"
./kafka-topics.sh --create --bootstrap-server $BootstrapServerString --command-config client.properties --replication-factor 3 --partitions 1 --topic aerospike-airforce-1

Listing topics

To list the topics, use the following command. Notice our latest topic, called aerospike-airforce-1, just showed up.

./kafka-topics.sh --bootstrap-server $BootstrapServerString --command-config client.properties --list
MSKTutorialTopic
__amazon_msk_canary
__consumer_offsets
aerospike
aerospike-airforce-1

Producer and consumer

I agree that this is more of a Kafka-101 rather than a straightforward Hello-World scenario. Nonetheless, it is essential to test our configuration by sending and receiving messages from the designated Kafka topic before proceeding further.

Produce some messages by opening a new window and running the following Kafka producer command. Type three or four messages, hitting the 'Return' key after each message.

./kafka-console-producer.sh --broker-list $BootstrapServerString --producer.config client.properties --topic aerospike-airforce-1
>Instrument Check
>Pre flight checks confirmed
>Ready for takeoff
>Full throttle, flaps

You're now ready to start a client consumer application. Open a new window and run the consumer. You should now see the same messages you published earlier.

./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike-airforce-1 --from-beginning
Instrument Check
Pre flight checks confirmed
Ready for takeoff
Full throttle, flaps

Database source

Let's review your achievements thus far. You've established a 3-node Kafka cluster in AWS utilizing MSK, incorporating IAM roles and permissions. Additionally, you have successfully created topics and demonstrated the production and consumption of messages using the IAM credentials established during the setup.

The next phase of your journey involves installing the Aerospike Database, inserting messages, and configuring a simple XDR component. XDR is a Cross Datacenter Replication tool and is crucial for transmitting data from the Aerospike Database to the Aerospike Kafka Source Connector allowing us to subsequently forward messages to Amazon MSK.

Create the Aerospike Database

  1. Start by creating a new ec2 instance. For this demo, you can use Linux Centos 8

Rocky 8 AMI: ami-043ceee68871e0bb5 ( us-east-1 )

img 3 msk

2. Select the instance type as t2.medium.

img 4 msk

3. Add the extra volume for the Aerospike data storage layer. EBS volume is all that is required for now.

img 5 msk

4. Launch the instance and connect to the host using ssh. If you have an Aerospike license feature file, upload it to the instance.

Install the Aerospike Database server

  1. Run the following to install the Aerospike Database Server.

export VER="6.1.0.2"
sudo yum install java python3 openssl-devel wget git gcc maven bind-utils sysstat nc -y
wget -O aerospike-tools.tgz 'https://www.aerospike.com/download/tools/latest/artifact/el8'
tar -xvf aerospike-tools.tgz
cd aerospike-tools_*
sudo ./dep-check
sudo ./asinstall
wget -O aerospike.tgz https://enterprise.aerospike.com/enterprise/download/server/$VER/artifact/el8
tar -xvf aerospike.tgz
cd aerospike-server-enterprise-$VER-el8
sudo ./asinstall
sudo mkdir -p /var/log/aerospike/
sudo systemctl enable aerospike

2. Confirm the storage disk for Aerospike.

lsblk
NAME    MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda    202:0    0  10G  0 disk
└─xvda1 202:1    0  10G  0 part /
xvdb    202:16   0  10G  0 disk   <<----------------- This one!

3. When its data is available, replace the Aerospike configuration file under /etc/aerospike/aerospike.conf with the configuration file listed below, also replacing the following lines:

  1. Under

    heartbeat.address

    add in your internal 172.x.x.x address

  2. For

    xdr.dc.node-address-port

    enter the {kafka-client-machine-address}:8080

Aerospike Database configuration file for use with systemd

service {
  # paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically
  proto-fd-max 15000
  service-threads 10
  feature-key-file /etc/aerospike/features.conf
  node-id A1
  cluster-name CLA
}
logging {
  file /var/log/aerospike/aerospike.log {
    context any info
  }
}
# public and private addresses
network {
  service {
    address any
    port 3000
  }
  heartbeat {
    mode mesh
    address 172.31.94.201
    port 3002 # Heartbeat port for this node.
    interval 150 # controls how often to send a heartbeat packet
    timeout 10 # number of intervals after which a node is considered to be missing
  }
  fabric {
    port 3001
  }
  info {
    port 3003
  }
}
namespace test {
  replication-factor 2
  memory-size 40G
  default-ttl 0
  index-type shmem
  high-water-disk-pct 50
  high-water-memory-pct 60
  stop-writes-pct 90
  nsup-period 0
  storage-engine device {
    device /dev/xvdb
    data-in-memory false
    write-block-size 128K
    min-avail-pct 5
  }
}
xdr {
  # Change notification XDR block that round-robins between two connector nodes
  dc aerospike-kafka-source {
    connector true
    node-address-port 172.31.58.190 8080
    namespace test {
    }
  }
}

Start the Aerospike service

  1. Copy the license feature file to the aerospike configuration directory.

sudo cp features.conf /etc/aerospike/

2. Start the Aerospike server and check the logs to ensure there are no errors.

sudo systemctl start aerospike
sudo systemctl status aerospike

Aerospike Kafka Source Connector

The seamless flow of data from Aerospike Database Enterprise Edition to Apache Kafka hinges on the utilization of the Aerospike Kafka source (outbound) connector. This connector subscribes to change notifications. Upon receiving these notifications, the connector converts them into messages, which are dispatched to Kafka topics. Going back to the ec2 instance you created earlier with our Kafka client configured, go ahead and install the Aerospike Kafka Source Connector. This is your outbound connector to send data from the Aerospike to MSK.

sudo yum install java #( install 11+ JDK )
wget https://enterprise.aerospike.com/artifacts/enterprise/aerospike-kafka-outbound/5.0.1/aerospike-kafka-outbound-5.0.1-1.noarch.rpm
sudo rpm -i aerospike-kafka-outbound-5.0.0-1.noarch.rpm

Configure the outbound connector

The terms “outbound” and “source connector” are used interchangeably in this article.

  1. Locate the following file on the Kafka client box: /etc/aerospike-kafka-outbound/aerospike-kafka-outbound.yml.

  2. Replace the broker address for one of the node addresses in the MSK Kafka cluster producer-props.bootstrap.servers.

  3. Then add the following contents to the file with the changes that have been outlined.

# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connectors/enterprise/kafka/outbound/configuration/index.html
# for details.
# The connector's listening ports, TLS, and network interface.
service:
  port: 8080
# Format of the Kafka destination message.
format:
  mode: flat-json
  metadata-key: metadata
# Aerospike record routing to a Kafka destination.
routing:
  mode: static
  destination: aerospike
# Kafka producer initialization properties.
producer-props:
  bootstrap.servers:
    - b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098
  ssl.truststore.location: /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
  ssl.truststore.password: changeit
  security.protocol: SASL_SSL
  sasl.mechanism: AWS_MSK_IAM
  sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=default;
  sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
# The logging properties.
logging:
  file: /var/log/aerospike-kafka-outbound/aerospike-kafka-outbound.log
  enable-console-logging: true
  levels:
    root: debug
    record-parser: debug
    server: debug
    com.aerospike.connect: debug
  ticker-interval: 3600

4. Create the CA certificate trust store for use in the Kafka Outbound Connector config. You can see the SSL trust store location referenced in the file above as ssl.truststore.location

sudo cp /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
sudo chmod 755 /etc/aerospike-kafka-outbound/kafka.client.truststore.jks

5. Finally, make the AWS IAM Kafka Auth Jar file available to the Aerospike Outbound Kafka Connector. This is the same jar file that you downloaded and added to the kafka/libs folder.

sudo cp kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar /opt/aerospike-kafka-outbound/lib/aws-msk-iam-auth-1.1.1-all.jar

6. Start the service.

sudo systemctl enable aerospike-kafka-outbound
sudo systemctl start aerospike-kafka-outbound

Send data from Aerospike to Kafka

  1. Open a separate window so you can list all messages on the Aerospike Kafka topic. Start by adding one of the private endpoint bootstrap servers as an environment variable for ease of use.

export BootstrapServerString="b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"

2. Run the consumer client as follows:

./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike --from-beginning

3. In a new window, start AQL, the Aerospike command line client which connects to your Aerospike Database.

aql -U auser -P a-secret-pwd

4. Insert some data

insert into test (pk, a) values(400, "Your winning lottery ticket awaits you")

5. Check to see if the message appears in the Kafka consumer window

{"metadata":{"namespace":"test","userKey":400,"digest":"W7eGav2hKfOU00xx7mnOPYa2uCo=","msg":"write","gen":1,"lut":1681488437767,"exp":0},"a":"Your winning lottery ticket awaits you"}

Conclusion

You've just discovered how straightforward it is to transmit data from Aerospike to AWS MSK Kafka while ensuring client authentication through AWS IAM permissions! From establishing an Aerospike Database from scratch to configuring the AWS MSK Kafka cluster and employing the Aerospike Outbound Kafka Connector, you've effortlessly constructed a real-time streaming data pipeline. Congratulations on this accomplishment!

Share your experience! Your feedback is important to us. Join our Aerospike community!