Finding Cyber Security Signals in The RSS and Twitter Noise

Tommy
February 16 2021

Key Takeaways

  • It is possible to index and tag a high number of RSS, OTX and Twitter articles on limited computational power in seconds
  • Building logic around timestamps is complex
  • Structuring the resulting data in a graph is meaningful.

Introduction

Today I am sharing some details about one of the multi-year projects I am running. The project motivation is:

To stay up to date on cyber security developments within days.

I didn’t want a realtime alerting service, but an analysis tool to gather important fragments of data over time. These fragments makes up the basis of my open source research. The curated information usually ends up on a channel like an NNTP feed, sometimes with added comments.

My solution was to create a common interface to ingest and search content from third party sources, Achieving this is difficult, and requires some work, but I found it feasible.

Going throught some basic research I found that much of what happens on the web eventually ends up on one of the following three places (e.g. a mention):

  1. OTX
  2. Twitter
  3. RSS

After some work I found that there were two things important to me in the first iteration:

  1. Being able to recognize the characteristics of the content
  2. Knowing the publish time of the data

The primary problem was thus to build a program that scales with a large number of feeds.

Going from there I built a prototype in Python, which I’ve now matured into a more performant Golang version. What follows from here is my experience from that work.

The tested component list of the program I am currently running are:

  • Gofeed [1]
  • Badger [2]
  • Apache Janusgraph [3,4]
  • Apache Cassandra [5]
  • Go-Twitter [6]
  • Alienvault OTX API [7]
  • Araddon Dateparse [8]

[1] https://github.com/mmcdole/gofeed
[2] https://github.com/dgraph-io/badger
[3] https://janusgraph.org
[4] https://docs.janusgraph.org/basics/gremlin/
[5] https://cassandra.apache.org
[6] https://github.com/dghubble/go-twitter/twitter
[7] https://github.com/AlienVault-OTX/OTX-Go-SDK/src/otxapi
[8] https://github.com/araddon/dateparse

The Lesson of Guestimation: Not All Feeds Are Created Equal

Timestamps is perhaps some of the more challenging things to interpret in a crawler and search engine. RSS is a loose standard, at least when it comes to implementation. This means that timestamps may vary: localized, invalid per the RFC standards, ambiguous, missing and so on. Much like the web otherwise. Luckily without javascript.

The goal is simply about recognizing what timestamp are the most correct one. A feed may contain one form of timestamp, while a website may indicate another one. To solve this I use and compare two levels of timestamping:

  • The feed published, updated and all items individual timestamps
  • The item and website last modified timestamps

Looking back, solving the first level of timestamping was straight forward. These timestamps are present in the feed and for RSS the logic to build a list of timestamps would look like this:

/* First we check the timestamp of all
*  feed items (including the primary).
*  We then estimate what is the newest
*  one */
var feedElectedTime time.Time
var ts = make(map[string]string)
ts["published"] = feed.Published
ts["updated"] = feed.Updated
var i=0
for _, item := range feed.Items {
    ts[strconv.Itoa(i)] = item.Published
    i++
    ts[strconv.Itoa(i)] = item.Updated
    i++
}
feedElectedTime, _, err = tsGuestimate(ts, link, false)

The elected time can be used to compare with a previous feed checkpoint to avoid downloading all items again. Using the above logic I was also able to dramatically increase the success rate of the program, since it requires a valid timestamp. The tsGuestimate logic is something for a future post.

Further the item/website timestamps requires a similar method, but in addition I found it an advantage to do a HTTP HEAD request to the destination URL to combine with the timestamps available from the feed. The central and important aspect here is to abort retrieval if an item already exists in the database, this is dramatically increases the processing in each run.

False timestamps are a problem. I noticed that websites publish feeds with dynamic timestamps, which means that when you retrieve the feed it adds the timestamp of now. This obviously creates resource-intesive operations since the whole feed is then at risk for re-indexing each run.

Noise Reduction: Recognizing Content Characteristics

Retrieving content is possible in several ways. For recognizing the content I opted for and have success/good coverage using regex. This is also some of the good things of curating articles, since this means experience with questions such as “why did I miss this article?” evolves into a new iteration of the program input.

For instance, to stay on top of targeted cyber operations, I found that much used phrases in articles was “targeted attack” and “spear phishing”. So based on that I deployed the following keyword search (regular expression) which applies to every new item ingested:

"targeted":"(?i)targeted\\satt|spear\\sp",

So a new article containing “targeted attack” in the body or title is tagged with a hotword “targeted”. Another hotword could be “breach”.

Perhaps not surprising this data can be modelled in a graph like follows.

Tweet ─> URL in tweet ┌─> Targeted
                      └─> Breach

A Practical Example

Traversing a news graph, we can go from the hotword “targeted”, to all items and articles for the past days linked to the hotword.

I use Gremlin for querying. An example is shown below (some details omitted):

keyw="targeted"
_date="2021-02-10"
g.V().hasLabel('hotword').has('title',keyw).as("origin_hw").
  in().in().hasLabel('article:m').has('timestamp',gte(_date)).order().by('timestamp',asc).as('article').
  .select("origin_hw","article").by(values('title','timestamp'))

The procedure above summarized:

  1. Find the node with the keyword “targeted”
  2. Find all articles (for instance a tweet) that are two steps out from the keyword (since these may be linked via a content node)
  3. Get title and timestamp from hotword and tweet

Using a match, which was incidentally not a tweet but an article, from a RSS feed, we find the following:

==>{origin_hw=targeted, article=WINDOWS KERNEL ZERO-DAY EXPLOIT (CVE-2021-1732) IS USED BY BITTER APT IN TARGETED ATTACK}

Retrieving the article with Gremlin, we can decide the source:

gremlin > g.V().has('title','WINDOWS KERNEL ZERO-DAY EXPLOIT (CVE-2021-1732) IS USED BY BITTER APT IN TARGETED ATTACK').valueMap()


=>{link=[https://www.reddit.com/r/netsec/.rss], 
title=[WINDOWS KERNEL ZERO-DAY EXPLOIT (CVE-2021-1732) IS USED BY BITTER APT IN TARGETED ATTACK], 
src=[Reddit - NetSec], 
src_type=[rss],
sha256=[8a285ce1b6d157f83d9469c06b6accaa514c794042ae7243056292d4ea245daf],
added=[2021-02-12 10:42:16.640587 +0100 CET],
timestamp=[2021-02-10 20:31:06 +0000 +0000], 
version=[1]}

==>{link=[http://www.reddit.com/r/Malware/.rss], 
title=[WINDOWS KERNEL ZERO-DAY EXPLOIT (CVE-2021-1732) IS USED BY BITTER APT IN TARGETED ATTACK], 
src=[Reddit - Malware], 
src_type=[rss],
sha256=[69737b754a7d9605d11aecff730ca3fc244c319f35174a7b37dd0d1846a823b7],
added=[2021-02-12 10:41:48.510538 +0100 CET],
timestamp=[2021-02-10 20:35:11 +0000 +0000],
version=[1]}

In this instance the source was two Reddit posts which triggered the keyword in question and others about a targeted incident in China. Additionally this triggered a zero day hotword.

Summary

Through this post I have shown some key parts of how to build a feed aggregator that can scale to thousands of feeds on a single computer, with update times in seconds.

I have also given a brief view on how Janusgraph and similar systems can be used to model such data in a way which makes it possible to search, find and eventually stay up to date on relevant information to cyber security.

When in place such a system may save hours per day since the data is normalised and searchable in one place.

Tags: #feed #twitter #otx #rss #aggregation #search #graph #gremlin
Read with Gemini

Graphs at Scale, Titan-Rexster: Gephi Visualization Options

Tommy
February 16 2015

Following up on my post yesterday, I have also been looking at graphs the other way - from a scalable database to a manageable graph involving e.g. just one segment.

There are currently two ways to do this:

1) Export the graph, and 2) streaming the graph from and to the graph database. The first option is obviously the simple one, but doesn’t always make up for our needs. The latter option is often the case when you work multiple analysts at the same graph.

Option 1: Exporting the Graph

To achieve the first you can use the GraphML save function of Gremlin.

conf = new BaseConfiguration();
conf.setProperty("storage.backend","hbase");
conf.setProperty("storage.hostname","sandbox.hortonworks.com");
conf.setProperty("storage.port","2181");
g = TitanFactory.open(conf);
g.saveGraphML('test.graphml')

This graph can again be opened in tools such as Gephi.

You can also use the Gephi database API plugin for Rexster. There’s a Blueprints repo [1] which extends that. Short how-to on how to get going with the Gephi development environment, from the wiki-pages of the plugin [2]:

  1. Get plugins from [3], and [4]
  2. Open Gephi, go to Tools > Plugins > Downloaded > "Add Plugins..."
  3. Press install and follow the guidance, at the end you should restart Gephi
  4. Go to File > Import Database
  5. Add the Rexster configuration to /etc/graph/rexster.xml (if when importing the database issues arises, look at [5]

rexster.xml should look like this:

<graph>
    <graph-name>RexterGraph</graph-name>
    <graph-type>com.tinkerpop.rexster.config.RexsterGraphGraphConfiguration</graph-type>
    <graph-buffer-size>100</graph-buffer-size>
    <graph-location>http://192.168.109.128:8182/graphs/titan</graph-location>
</graph>

You should be left with something like this for instance in Gephi:

A Rexster Graph Import to Gephi, from a Titan database. The graph consists of a variety of segments, such as articles from a article-system and imported Maltego graphs
A Rexster Graph Import to Gephi, from a Titan database. The graph consists of a variety of segments, such as articles from a article-system and imported Maltego graphs

A Rexster Graph Import to Gephi, from a Titan database. The graph consists of a variety of segments, such as articles, imported Maltego graphs and such.

A Rexster Graph Import to Gephi, from a Titan database. The graph consists of a variety of segments, such as articles from a article-system and imported Maltego graphs

Here’s the cluster on the right there by the way. There’s some interesting patterns inside there it seems, so I suspect it’s from a Maltego graph:

Option 2: The Gephi Streaming API

For the other option I found the Gephi graph streaming API [6]. This one I currently found a little limited in that it can only provide collaboration between two Gephi instances using a Jetty web-server. It’s pretty cool, but doesn’t offer the integration I am looking for. I’ll get back to this later.

[1] https://github.com/datablend/gephi-blueprints-plugin [2] https://github.com/datablend/gephi-blueprints-plugin/wiki [3] https://github.com/downloads/datablend/gephi-blueprints-plugin/org-gephi-lib-blueprints.nbm [4] https://github.com/downloads/datablend/gephi-blueprints-plugin/org-gephi-blueprints-plugin.nbm [5] https://github.com/datablend/gephi-blueprints-plugin/issues/1 [6] https://marketplace.gephi.org/plugin/graph-streaming/

Tags: #graph #rexster #gephi #scaling #visualization
Read with Gemini

A Graph Experiment with Threats and Incidents

Tommy
February 16 2015

I currently maintain this threat database, and up until now I’ve generated the graph data for d3 using queries, and a lot of logic, in a MySQL-database. That is going to change pretty soon. You might also remember when we did Social Network Analysis and Object Attribution with Maltego 3 [1].

In my seeking for understanding the Apache Hadoop ecosystem I all of a sudden got a brutal meeting with Java (Eclipse huh..). I also discovered that there are a world of libraries and applications previously unknown to me. One of them is the über-awesome Neo4j, which is a graph database originally built for Java - but guess what: It’s got a REST API as well. As usual you don’t have to write the Python code yourself, someone already wrote it for you. Note that it only does Python 2 for now [2,3].

The coolest thing about Neo4j is Cypher [5]: Cypher is a “graph query language” as they put it themselves. With Cypher you can express what you look for in an entirely other way than you would do in a relational database, it’s actually easy.

And: You of course need the database running as well. If you use a Debian system like me your in luck since they have an experimental version out there [5].

Enough talk, here is a simple example of how you could go about it in regard to scripting the relations considering threat intelligence in order to connect groups to incidents. The goal would be to find peripherally connected groups.

from GraphConn.Connect import Graph
g = Graph()

# create groups
g.cGroup("ThreatA")
g.cGroup("ThreatB")
g.cGroup("ThreatC")

# create incidents
g.cIncident("IncA")
g.cIncident("IncB")
g.cIncident("IncC")

# relate groups in some way to each other through incidents
g.link("ThreatA","IncA")
g.link("ThreatA","IncB")
g.link("ThreatB","IncC")
g.link("ThreatC","IncA")
g.link("ThreatB","IncB")

# find all threats related to Threat A through incidents
print g.fRelated("ThreatA")

You might find this simple, but if you’ve ever tried to do it in SQL you know why you’ll need it. Also, remember that this scales indefinite to other entity types as well.

Here’s the class used to generate the graph, for reference (feel free to copy it, produce something cool and post it back in the comment field):

from neo4jrestclient import client
from neo4jrestclient.client import GraphDatabase
from neo4jrestclient.query import Q

class Graph:
    def __init__(self):
        self.gdb = GraphDatabase("http://localhost:7474/db/data/")
        self.nodes = []

    def cGroup(self,name):
        n = self.gdb.nodes.create(name=name, type='Group')
        self.nodes.append(n)

    def cIncident(self,name):
        n = self.gdb.nodes.create(name=name, type='Incident')
        self.nodes.append(n)

    def link(self,n1,n2):
        try:
            l = (Q("name", iexact=n1)); n1 = self.gdb.nodes.filter(l)[0];
            l = (Q("name", iexact=n2)); n2 = self.gdb.nodes.filter(l)[0];
            return n1.relationships.create("Executed", n2)
        except:
            return False

    def fRelated(self,query):
        l = (Q("name", iexact=query))
        n = self.gdb.nodes.filter(l)[0]
        r = n.traverse()
        for n2 in r:
            for e in n2.traverse():
                r.append(e)
        return list(r)

I really hope you enjoy this as much as me right now. The Facebook Graph Search for the rest of us.

[1] gopher://secdiary.com/0/post/sna-oa-maltego/index.txt [2] https://pypi.python.org/pypi/neo4jrestclient/ [3] https://neo4j-rest-client.readthedocs.org/en/latest/elements.html [4] http://www.neo4j.org/learn/cypher [5] http://debian.neo4j.org/

Tags: #python #graph #neo4j #applied
Read with Gemini

This blog is powered by cl-yag and Tufte CSS!