Skip to main content

Event Processing Pipeline with AWS/CBR

In my previous blog post, we covered how to extract raw network connections from real time process executions using the Carbon Black APIs. After extracting out each network connection, we enriched the network data with some GeoIP information to aid in detecting suspicious network connections. The downside to this method is you need to specify the processes you wish to analyze. This means you may miss events if you fail to specify the correct process; you also miss out on key metrics that, by themselves may point to something suspicious. Let’s take a looks at how we can use some AWS resources and the Carbon Black Event Forwarder to handle all network connections. For reference, the event schema can be found at the following link below:

While the content below is far from perfect (prototyping an idea), it shows how we can use AWS resources to handle any JSON object containing an IP address or domain name.

Our base ingress.event.netconn event from Carbon Black Event Forwarder should look like the following:
Next, let's create a simple AWS setup to handle a stream of network events, originating from the event forwarder:
  1. Ingress.event.netconn is forwarded to s3 from the event forwarder. (
  2. S3 bucket has an event notification set up to send a message to SQS any time a new object is created in this bucket. 
  3. Once the new message is available on the SQS queue, a Lambda function will be executed
  4. Lambda function will read the message off the SQS queue. This message tells Lambda the location of the ingress.event.netconn json in s3. (CBR forwards the messages in groups, so one file should have many netconn JSON items inside)
  5. Lambda function will read each JSON item and perform a GeoIP lookups on the “remote_ip” field using an API lookup (assuming its not in the RFC1918 range). The geoip results are permanently appended to the JSON object.
  6. The JSON is saved to a database such as Mongo/ElasticSearch/DynamoDB
If we review our processed JSON, will now have a new key called “geoip”, as outlined below:
This is a very simple event processing pipeline where network events are forwarded to s3 and flow through this tiny pipeline to eventually have GeoIP data appended to each JSON object. This simple setup should allow you to process significantly more network connections than our previous script. But why stop there? With this pipeline, we could process ANY JSON document that has an IP address and append GeoIP information to it. We could also add more API lookups to help enrich the data further.

Additional Data Enrichment

Let’s say you’re collecting network events for 10 customers. A few additional metrics we created to assist in analysis at scale and on an ongoing basis are:
  • ip_domain_observations
  • ip_domain_reputation
  • intel
  • netconn hashes

ip_domain_observations is a collection of metrics for the IP address or domain categorized in three categories:
These metrics are incremental counters for each ip address over a given span of time (maybe 3-6 months or more). Some example questions you may ask within this dataset could be:
  • How many times have we observed this ip address or domain on the host “WIN-OTEMNUTBS23”?
  • How many times have we observed this ip address or domain at customer01?
  • How many times have we observed this ip address or domain at our financial customers?

We can get more complex by adding in port/protocol/direction into the mix or even track DNS resolutions if you so desire. For this example, we’re keeping it simple. You can see an example of this output below.
Line 29: Observations to date for on host “WIN-OTEMNUTBS23
Line 30: Observations to date for for the customer “CUST01
Line 31: Observations to date for for all customers or subset of customers (ex: Financial or Energy sectors)

Ip_domain_reputation is a more focused dataset. While the data is still categorized into the three categories above, the reputation is tracking any alerts/investigations you may have performed that include either the ip address or domain. In our example, let’s say we have two result statuses (reported and false positive) for any alert or investigation performed on the ip address or domain:
Line 43: Internal reputation for the IP address on the host “WIN-OTEMNUTBS23”.
Line 47: Internal reputation for the IP address for the customer “CUST01”.
Line 43: Internal reputation for the IP address for all customers or subset of customers (Financial or Energy sectors).

Intel comes in all shapes and sizes, so for this data, lets just assume you have three different vendors that provide intel on the ip addresses or domains. Your resulting JSON after lookup may look like below:
Netconn hashes is a collection of sha1 hashes comprised of various parts of the JSON document itself. For this event type ingress.event.netconn, we could create three different hashes by concatenating the following pieces of information together:
  • HASH1: SHA1(remote_ip + port + protocol)
  • HASH2: SHA1(remote_ip + direction)
  • HASH3: SHA1(remote_ip + port + protocol + domain)

Once generated, you can quickly query the database for each hash and increment its value anytime that hash is observed. In the end, your JSON may look like the below:
To add in further context, we could also factor in the three categories mentioned above into the hash. This is just a fun example of how you can mux data together to find anomalies. I suspect hashing various parts of a network isn't the most efficient thing, but it’s an interesting way to view the data. While off topic, a great example of using a hash to summarize a subset of data would be imphash:

To enable this workflow, we can create an API gateway that exposes all these data sources to Lambda, as outlined below:
Notice we have an SQS queue and Lambda function for each API endpoint so we can modularize the pipeline in the future. To better understand how the pipeline looks from an AWS perspective, checkout the image below:

While I’m sure there’s many other ways to build this out, the flow of data through this prototype pipeline is as follows:

  1. Ingress.event.netconn is sent from the event forwarder to your “netconns” s3 bucket. (group of events inside a single file)
  2. After successful transfer, an S3 object is created.
  3. Since an event notification is set up on the s3 bucket, a message about this Ingress.event.netconn object is sent to your SQS queue. 
  4. Since a Lambda function is configured as an event source, a Lambda function will be started.
  5. The newly executed Lambda function will read the message from the SQS queue. This message will tell the Lambda function where the netconn object is stored in s3 for processing.
  6. Lambda function will retrieve the referenced JSON object from s3, perform any formatting or pre-lookups then place another message on the “geoip” SQS queue to begin the pipeline. The JSON document will traverse each queue and associated Lambda function. Each Lambda function is assigned a specific API call to use and will append its output to the JSON. 
  7. After all Lambda functions have processed the JSON (a single netconn event), the newly enriched JSON document is saved to the database/
    • If the JSON document contains specific fields/tags at the end of processing, it could also be routed to an “alerts” queue for analyst review or further analysis.

Let's take a look at an example final output after processing:
We can see that our newly processed network event now has 5 new keys
  • Geoip
  • Observations
  • Netconn_hashes
  • Reputation
  • Intel

Once you get a grasp on how to enrich the event data, you can extend out this capability further by forwarding other raw endpoint events such as:
  • Ingress.event.filemod
    • Use case: If an attacker creates a staging directory in the ProgramData directory called “Python6”, you could identify any file mods made inside this “Python6” staging directory and kick off file acquisitions to collect these files in real time.
  • Ingress.event.regmod
    • Use case: If malware is identified creating a specific registry key, you could kick off a command to dump the contents of the new registry key to review the data.

Let's not stop there! We can also blend in a few other server events as well to perform additional enrichment such as:
  • binaryinfo.observed
    • Use case: As stated in the CBR documentation, “This event happens when a new binary is observed for the first time anywhere in the environment”. When a new binary is observed, you could automatically download the binary from the CBR instance and perform some additional actions:
      • Toss the sample in a sandbox of your choice, appending key results to the JSON
      • Run the sample through Xori (
      • Perform OSINT intel lookups on hashes/imphash and/or other attributes of the binary such as PE section names, unique strings, compile time or observed locations on disk.

While I hand picked only a subset of event types, the event forwarder supports many other event types outlied here: Prior to enabling all types, I would recommend you turn on one event at a time to assess load and ensure they’re generating value for your team.  

Carbon Black also provides its own message bus. If you wish to learn more about this, you can follow the link below:

While the content and workflow provided in this post are far from perfect (prototyping is key), I hope the concepts outlined in this post were helpful and parts of it are useful to your team. Happy Hunting!

Special thanks to Mike Scutt (@OMGAPT), Jason Garman and the CB team for all the help.


Popular posts from this blog

Analyzing and detecting web shells

Of the various pieces of malware i’ve analyzed, I still find web shells to be the most fascinating. While this not a new topic, i've been asked by others to do a write up on web shells, so here it is ;). 
For those new to web shells, think of this type of malware as code designed to be executed by the web server - instead of writing a backdoor in C, for example, an attacker can write malicious PHP and upload the code directly to a vulnerable web server. Web shells span across many different languages and server types. Let's take a looks at some common servers and some web extensions:
Operating System Service Binary Name Extensions Windows IIS (Internet Information Services) w3wp.exe .asp/.aspx Windows/Linux apache/apache2/nginx httpd/httpd.exe/nginx .php Windows/Linux Apache Tom

Introduction to Malware Analysis

Why malware analysisMalware analysis (“MA”) is a fun and excited journey for anyone new or seasoned in the career field. Taking a specimen (malware sample) and reverse engineering it to better understand its inner workings can be a long, tedious adventure. With the sheer number of malware samples circulating the internet, in addition to the various formats specimens are found in, makes malware analysis a good challenge. Outside of learning MA as a hobby, here are some other reasons why we perform malware analysis:To better understand how a specimen works. This may yield certain unique attributes about how the malware was written, methods it performs or its dependencies.To collect intelligence and build Indicators of Compromise (“IOCs”), usually comprised of Host Based Indicators (“HBIs”) and/or Network Based Indicators (“NBIs”).For general knowledge or research purposes.How do I get started?!If you’re new to malware analysis, you want to ensure you’ve taken the right precautions befor…

Smashing the stack with Carbon Black

In this blog post, we will cover how we perform stacking using Carbon Black Response and how we can use this methodology to find anomalies in your environment. In reality, an awesome threat hunter would like to have the following data at their disposal:
Type Code Details Real Time RT Real time process executions and its context Forensic FZ Live forensic data such as prefetch, appcompat, registry keys, etc.. Network NT PCAP and extracted metadata Logs LG