Posts Tagged ‘Go’

A threaded Go language application with semaphores

No Comments »

After hearing about Go and its love of parallelism, I wanted to give it a try. I couldn’t think of any small projects that needed to be threaded until recently.  My employer uses an RPM system, but I’m using Ubuntu, which requires deb files.  Searching through the RPMs in a browser is a pain, so I wanted a small script that would search for me.  Here’s the script with server names changed.

package main

import (
  "bufio"
  "fmt"
  "http"
  "os"
  "regexp"
  "strings"
)

var servers = []string {
  "http://www.google.com/search?client=ubuntu&channel=fs&q=go+language&ie=utf-8&oe=utf-8",
  "http://golang.org/",
  "http://golang.org/doc/go_tutorial.html"}

var maxOpenRequests int = 3

var rpmRegexp = regexp.MustCompile("href=\"[^\"]+\"")

func main() {
  search := getSearchTerm()
  out := make(chan string)
  done := make(chan int)
  go printer(out)
  for _, url := range servers {
    go searchURL(search, url, out, done)
  }
  for i := 0; i < len(servers); i++ {
    <- done
  }
}

func getSearchTerm() string {
  args := os.Args
  if len(args) != 2 {
    die("Please enter one search term")
  }
  return args[1]
}

func die(message string) {
  fmt.Println(message)
  os.Exit(1)
}

func printer(out chan string) {
  for {
    fmt.Println(<- out)
  }
}

var requestSemaphore = make(chan int, maxOpenRequests) // Integer chanel with a maximum queue size

func searchURL(search string, url string, out chan string, done chan int) {
  requestSemaphore <- 1 // Block until put in the semaphore queue
  response, realURL, err := http.Get(url)
  if err == nil {
    bufferedReader := bufio.NewReader(response.Body)
    err = searchAll(search, bufferedReader, url, out)
    response.Body.Close()
  }
  if err != nil {
    die("Could not read from " + realURL + ":" + err.String())
  }
  <- requestSemaphore // Dequeue from the semaphore
  done <- 1 // Signal that function is done
}

func searchAll(search string, reader *bufio.Reader, httpRoot string, out chan string) (os.Error) {
  var error os.Error = nil;
  for {
    var line, err = reader.ReadString('\n')
    if err != nil {
      if err == os.EOF {
        break
      }
      return error
    }
    var hrefs = rpmRegexp.AllMatchesStringIter(line, 0)
    for href := range hrefs {
      start := strings.Index(href, "\"") + 1
      end := len(href) - 1
      packageFile := href[start : end]
      if strings.Index(packageFile, search) != -1 {
        out <- httpRoot + packageFile
      }
    }
  }
  return error
}

A quick overview

Go uses “channels” for communication among threads.

  1. The main thread creates two channels, one to receive search output and one to receive “done” responses when a child thread finishes.
  2. The main thread attaches the output channel to a simple function that prints to standard out.
  3. The main thread spawns one child thread per server, giving each the output and done channels.
  4. The main thread loops over the done channel once for each server, waiting until all requests are complete.
  5. Each child blocks immediately until space is available in a semaphore.
  6. The child performs the HTTP get, then searches the response for anything matching a regular expression.  Any matches are signaled back to the output channel.
  7. The child opens a space in the semaphore and signals to the main thread that it is done.

There isn’t much more to say.  The Go web site has great tutorials, so I don’t yet feel the need to write one.

By the way, if you’re searching for Go resources, be sure to search for “go language” and not “go”.  But then, that’s probably how you found this page 😉

package main

import (
“bufio”
“fmt”
“http”
“os”
“regexp”
“strings”
)

var servers = []string {
“http://apt.duncllc.com/dist/WS4.0/RedHat/RPMS/”,
“http://apt.duncllc.com/dist/WS4.0/RedHat/RPMS.extras/”,
“http://apt.duncllc.com/dist/WS4.0/RedHat/RPMS.java/”}

var maxOpenRequests int = 3

var rpmRegexp = regexp.MustCompile(“href=\”[^\”]+\””)

func main() {
search := getSearchTerm()
out := make(chan string)
done := make(chan int)
go printer(out)
for _, url := range servers {
go searchURL(search, url, out, done)
}
for i := 0; i < len(servers); i++ {
<- done
}
}

func getSearchTerm() string {
args := os.Args
if len(args) != 2 {
die(“Please enter one search term”)
}
return args[1]
}

func die(message string) {
fmt.Println(message)
os.Exit(1)
}

func printer(out chan string) {
for {
fmt.Println(<- out)
}
}

var requestSemaphore = make(chan int, maxOpenRequests) // Integer chanel with a maximum queue size

func searchURL(search string, url string, out chan string, done chan int) {
requestSemaphore <- 1 // Block until put in the semaphore queue
response, realURL, err := http.Get(url)
if err == nil {
bufferedReader := bufio.NewReader(response.Body)
err = searchAll(search, bufferedReader, url, out)
response.Body.Close()
}
if err != nil {
die(“Could not read from ” + realURL + “:” + err.String())
}
<- requestSemaphore // Dequeue from the semaphore
done <- 1 // Signal that function is done
}

func searchAll(search string, reader *bufio.Reader, httpRoot string, out chan string) (os.Error) {
var error os.Error = nil;
for {
var line, err = reader.ReadString(‘\n’)
if err != nil {
if err == os.EOF {
break
}
return error
}
var hrefs = rpmRegexp.AllMatchesStringIter(line, 0)
for href := range hrefs {
start := strings.Index(href, “\””) + 1
end := len(href) – 1
packageFile := href[start : end]
if strings.Index(packageFile, search) != -1 {
out <- httpRoot + packageFile
}
}
}
return error
}