Introducing a middleware

Middlewares are basically every piece of software that is used as a medium, a communication channel between different and decoupled components in an architecture, message queues, message brokers, buses are effectively all middlewares.

Many times they're used in microservices architectures, think about asynchronous communication between different services, where you just want to schedule some kind of jobs without worrying of the immediate response, RabbitMQ or SQS is often used in these scenarios.
In our case, we're going to add a really simple message queue interface to decouple the crawling logic of the web crawler, this, along with decoupling responsibilities, comes with the benefit of a simpler testing of the entire application.

Producer and consumer

The most famous and simple abstraction is the producer-consumer pattern, where two actors are involved:

  • The producer, generates traffic
  • The consumer, consumes the traffic

It's a simplification of the pub-sub pattern, but can be easily extended and behave just like it, with multiple subscribers consuming the same source.

The bigger the interface, the weaker the abstraction
Rob Pike

Go is conceptually a minimalist-oriented language, or at least that's how I mostly perceive it, so the best practice is to maintain the interfaces as little as possible, and it really makes sense as every interface defines an enforcement, a contract that must be fulfilled, the less you have to implement to be compliant the better.

We've seen how Go best approach to abstractions is to not abstract at all until needed, good news is that, given the inherent pragmatism of the language, we're allowed to break the rules sometimes, according to common sense.
In this case, we're reasonably sure that for our application we're going to need a simple communication channel that enables two operations:

  • Production
  • Consumption

But for the sake of readability and extensibility for future additions we'll stick to Rob Pike's quote by adding three generic communication interfaces:

messaging/queue.go

// Package messaging contains middleware for communication with decoupled
// services, could be RabbitMQ drivers as well as kafka or redis
package messaging

// Producer defines a producer behavior, exposes a single `Produce` method
// meant to enqueue an array of bytes
type Producer interface {
    Produce([]byte) error
}

// Consumer defines a consumer behavior, exposes a single `Consume` method
// meant to connect to a queue blocking while consuming incoming arrays of
// bytes forwarding them into a channel
type Consumer interface {
    Consume(chan<- []byte) error
}

// ProducerConsumer defines the behavior of a simple message queue, it's
// expected to provide a `Produce` function a `Consume` one
type ProducerConsumer interface {
    Producer
    Consumer
}

// ProducerConsumerCloser defines the behavior of a simple mssage queue
// that requires some kidn of external connection to be managed
type ProducerConsumerCloser interface {
    ProducerConsumer
    Close()
}

Channels as function arguments can be specified also with the direction that they're meant to be used, be it send-only, receive-only or both.

Producer and consumer are meant to work with the most generic type beside interface{}, the []byte is usually an exchange format made out of encoding of data, be it binary or JSON it is possible to obtain a byte array representation of the data.
The consumer exported method Consume(chan <- []byte) error accepts a send-only channel, it's the channel that we're going to use to obtain our items, the method is in-fact thought as a blocking call whereas another goroutine is receiving on the other end of the channel.

From now on these are our gates and pipes to communicate, it'll be possible to create multiple different structs, like RabbitMQ or Redis backed to pass around bits from the crawling logic to other clients.

And that's exactly what we're going to do to make it testable, we'll introduce a fool-proof struct encapsulating a simple channel as communication backend directly inside our test files:

crawler_test.go

// Package crawler containing the crawling logics and utilities to scrape
// remote resources
package crawler

import (
    "encoding/json"
    "io/ioutil"
    "log"
    "net/http"
    "net/http/httptest"
    "os"
    "reflect"
    "sync"
    "testing"
    "time"
)

type testQueue struct {
    bus chan []byte
}

func (t testQueue) Produce(data []byte) error {
    t.bus <- data
    return nil
}

func (t testQueue) Consume(events chan<- []byte) error {
    for event := range t.bus {
        events <- event
    }
    return nil
}

func (t testQueue) Close() {
    close(t.bus)
}

func consumeEvents(queue *testQueue) []ParsedResult {
    wg := sync.WaitGroup{}
    events := make(chan []byte)
    results := []ParsedResult{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for e := range events {
            var res ParsedResult
            if err := json.Unmarshal(e, &res); err == nil {
                results = append(results, res)
            }
        }
    }()
    _ = queue.Consume(events)
    close(events)
    wg.Wait()
    return results
}

func serverMockWithoutRobotsTxt() *httptest.Server {
    handler := http.NewServeMux()
    handler.HandleFunc("/foo", resourceMock(
        `<head>
            <link rel="canonical" href="https://example-page.com/sample-page/" />
         </head>
         <body>
            <img src="/baz.png">
            <img src="/stonk">
            <a href="foo/bar/baz">
        </body>`,
    ))
    handler.HandleFunc("/foo/bar/baz", resourceMock(
        `<head>
            <link rel="canonical" href="https://example-page.com/sample-page/" />
            <link rel="canonical" href="/foo/bar/test" />
         </head>
         <body>
            <img src="/baz.png">
            <img src="/stonk">
        </body>`,
    ))
    handler.HandleFunc("/foo/bar/test", resourceMock(
        `<head>
            <link rel="canonical" href="https://example-page.com/sample-page/" />
         </head>
         <body>
            <img src="/stonk">
        </body>`,
    ))
    server := httptest.NewServer(handler)
    return server
}

func resourceMock(content string) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        _, _ = w.Write([]byte(content))
    }
}

func TestMain(m *testing.M) {
    log.SetOutput(ioutil.Discard)
    os.Exit(m.Run())
}

func withMaxDepth(depth int) CrawlerOpt {
    return func(s *CrawlerSettings) {
        s.MaxDepth = depth
    }
}

func withCrawlingTimeout(timeout time.Duration) CrawlerOpt {
    return func(s *CrawlerSettings) {
        s.CrawlingTimeout = timeout
    }
}

func TestCrawlPages(t *testing.T) {
    server := serverMockWithoutRobotsTxt()
    defer server.Close()
    testbus := testQueue{make(chan []byte)}
    results := make(chan []ParsedResult)
    go func() { results <- consumeEvents(&testbus) }()
    crawler := New("test-agent", &testbus, withCrawlingTimeout(100*time.Millisecond))
    crawler.Crawl(server.URL + "/foo")
    testbus.Close()
    res := <-results
    close(results)
    expected := []ParsedResult{
        {
            server.URL + "/foo",
            []string{"https://example-page.com/sample-page/", server.URL + "/foo/bar/baz"},
        },
        {
            server.URL + "/foo/bar/baz",
            []string{server.URL + "/foo/bar/test"},
        },
    }
    if !reflect.DeepEqual(res, expected) {
        t.Errorf("Crawler#Crawl failed: expected %v got %v", expected, res)
    }
}

func TestCrawlPagesRespectingMaxDepth(t *testing.T) {
    server := serverMockWithoutRobotsTxt()
    defer server.Close()
    testbus := testQueue{make(chan []byte)}
    results := make(chan []ParsedResult)
    go func() { results <- consumeEvents(&testbus) }()
    crawler := New("test-agent", &testbus, withCrawlingTimeout(100*time.Millisecond), withMaxDepth(3))
    crawler.Crawl(server.URL + "/foo")
    testbus.Close()
    res := <-results
    expected := []ParsedResult{
        {
            server.URL + "/foo",
            []string{"https://example-page.com/sample-page/", server.URL + "/foo/bar/baz"},
        },
        {
            server.URL + "/foo/bar/baz",
            []string{server.URL + "/foo/bar/test"},
        },
    }
    if !reflect.DeepEqual(res, expected) {
        t.Errorf("Crawler#Crawl failed: expected %v got %v", expected, res)
    }
}

Note: the only function mocking resources server side is called serverMockWithoutRobotsTxt(), this is because right now we're not considering the existence of a robots.txt file on the root of each domain, but in the next chapter we'll handle that set of rules as well, discussing crawling politeness

In order to make them pass we need to adapt the crawlPage method to use a Producer implementation to send out every crawled URL, so let's open crawler.go file and update the WebCrawler struct, its constructor and add the forwarding code into the main crawlPage loop:

type WebCrawler struct {
    // logger is a private logger instance
    logger *log.Logger
+    // queue is a simple message queue to forward crawling results to other
+    // components of the architecture, decoupling business logic from processing,
+    // storage or presentation layers
+    queue messaging.Producer
    // linkFetcher is a LinkFetcher object, must expose Fetch and FetchLinks methods
    linkFetcher LinkFetcher
    // settings is a pointer to `CrawlerSettings` containing some crawler
    // specifications
    settings *CrawlerSettings
}

The constructor will be updated as well

-func New(userAgent string, opts ...CrawlerOpt) *WebCrawler {
+func New(userAgent string, queue messaging.Producer, opts ...CrawlerOpt) *WebCrawler {
    ...
    crawler := &WebCrawler{
        logger:   log.New(os.Stderr, "crawler: ", log.LstdFlags),
+        queue:    queue,
        linkFetcher: fetcher.New(userAgent, settings.Parser, settings.FetchTimeout),
        settings: settings,
    }
    return crawler
}

Finally the crawlPage private method, we want that after every link has been extracted, it goes into the queue by using the Producer interface, for the format we're just encoding them into JSON by using the standard library:

crawler.go

func (c *WebCrawler) crawlPage(rootURL *url.URL, wg *sync.WaitGroup, ctx context.Context) {
    ...
    for !stop {
        ...
        go func(link *url.URL, stopSentinel bool, w *sync.WaitGroup) {
            ...
            // No errors occured, we want to enqueue all scraped links
            // to the link queue
            if stopSentinel || foundLinks == nil || len(foundLinks) == 0 {
                return
            }
            atomic.AddInt32(&linkCounter, int32(len(foundLinks)))
+            // Send results from fetch process to the processing queue
+            c.enqueueResults(link, foundLinks)
            // Enqueue found links for the next cycle
            linksCh <- foundLinks
        }(link, stop, &fetchWg)
    }
    fetchWg.Wait()
}

+// enqueueResults enqueue fetched links through the ProducerConsumer queue in
+// order to be processed (in this case, printe to stdout)
+func (c *WebCrawler) enqueueResults(link *url.URL, foundLinks []*url.URL) {
+    foundLinksStr := []string{}
+    for _, l := range foundLinks {
+        foundLinksStr = append(foundLinksStr, l.String())
+    }
+    payload, _ := json.Marshal(ParsedResult{link.String(), foundLinksStr})
+    if err := c.queue.Produce(payload); err != nil {
+        c.logger.Println("Unable to communicate with message queue:", err)
+    }
+}

And we're good to go, we should be all green running a go test now:

go test -v ./...
=== RUN   TestCrawlPages
--- PASS: TestCrawlPages (1.20s)
=== RUN   TestCrawlPagesRespectingMaxDepth
--- PASS: TestCrawlPagesRespectingMaxDepth (1.07s)
PASS
ok      webcrawler    3.495s
=== RUN   TestStdHttpFetcherFetch
--- PASS: TestStdHttpFetcherFetch (0.00s)
=== RUN   TestStdHttpFetcherFetchLinks
--- PASS: TestStdHttpFetcherFetchLinks (0.00s)
=== RUN   TestGoqueryParsePage
--- PASS: TestGoqueryParsePage (0.00s)
PASS
ok      webcrawler/fetcher    (cached)
?       webcrawler/messaging    [no test files]

In the next chapter we're going to explore politeness concept, or how a good web crawler should behave while visiting a domain and robots.txt rules that most of the time are inserted on the root of the site.

results matching ""

    No results matching ""