Twitter Streaming API from Processing

I wrote a Processing class (TwitterStreamingTrack.pde), which helps programmers who want to use Twitter Streaming API from Processing.

Sample Program

Through the Twitter Streaming API, this Processing program receives tweets that match "#nowplaying" or "suztomo".

/*
  TwitterStreamingTrackTest.pde
*/
TwitterStreamingTrack tst;

void setup() {
    size(1200, 800);
    PFont f = createFont("FFScala", 100);
    textFont(f);
    /*
      TwitterStreamingTrack(PApplet parent, String keywords,
                            String username, String password);

    */
    tst = new TwitterStreamingTrack(this, "#nowplaying,suztomo",
                                    "account", "password");
    background(0xFF);
}

void draw() {
    /*
      getNewTweets() returns a new Tweet object (defined below), if exists.
    */
    Tweet t = tst.getNewTweet();
    if (t != null) {
        fill(0x0);
        String msg = t.msg;
        text(msg,250 - int(random(500)), int(random(1000)) - 100);
    }
}

In the directory that contains this program (TwitterStreamingTrackTest.pde), are two Processing files (Tweet.pde, TwitterStreamingTrack.pde).

Tweet.pde

/*
  Tweet.pde
*/
public class Tweet{
    public String msg;
    public String scname;
    public String username;
    public String time;

    public Tweet(){
        msg = "";
        scname = "";
        username = "";
        time = "";
    }

    public Tweet(String _msg, String _scname, String _username, String _time) {
        msg = _msg;
        scname = _scname;
        username = _username;
        time = _time;
    }
}

TwitterStreamingTrack.pde

This class requires these two Java libraries (.jar).

/*
  TwitterStreamingTrack.pde
*/
import org.json.*;
import biz.source_code.*;
import processing.net.*;

public class TwitterStreamingTrack {
    private PApplet parent;
    private Client client;
    private int byteBuffersNum = 10;
    private byte[][] byteBuffers = new byte[byteBuffersNum][4096];
    private byte[] currentByteBuffer;
    private int byteBuffersIndex = 0;
    private int byteBuffersStart = 0;
    private int[] byteBuffersSizes = new int[byteBuffersNum];
    private String encoding = "UTF-8";
    private final String twitterStreamingServer="stream.twitter.com";
    private final String twitterStreamingMethod="POST";
    private final int twitterStreamingPort=80;
    private final String twitterStreamingPath="/1/statuses/filter.json";
    private String twitterStreamingAuthUser;
    private String twitterStreamingAuthPass;
    private String authKey;
    private Tweet recentTweet = null;
    private Pattern HTTPResponsePattern = Pattern.compile("(HTTP/1.1 (\\d+).+)");
    private Pattern HTTPBodyPattern1 = Pattern.compile("\r\n\r\n[^{]+\r\n(.*)", Pattern.DOTALL);
    private Pattern HTTPBodyPattern2 = Pattern.compile("[0-9A-Z]\r\n(.*)", Pattern.DOTALL);


    /*
      parent: usually ``this''
      keywords: keyword list, comma separated
    */
    public TwitterStreamingTrack(PApplet parent, String keywords,
                                 String username, String password) {
        twitterStreamingAuthUser = username;
        twitterStreamingAuthPass = password;
        this.parent = parent;
        initClient();
        prepareAuthKey();

        String encoded_keywords = "logcafe";
        try {
            encoded_keywords= java.net.URLEncoder.encode(keywords, encoding);
        } catch (UnsupportedEncodingException e) {
            println(e.toString());
        }
        sendHTTPRequest(encoded_keywords);
        currentByteBuffer = byteBuffers[byteBuffersIndex];
    }

    /*
      This method should be called each frame.
    */
    public Tweet getNewTweet() {
        recentTweet = null;
        checkBytes();
        return recentTweet;
    }

    private void initClient() {
        client = new Client(parent, twitterStreamingServer,
                            twitterStreamingPort);
    }

    private void prepareAuthKey() {
        /*
          http://www.source-code.biz/base64coder/java/
        */
        authKey = Base64Coder.encodeString(twitterStreamingAuthUser + ":" +
                                           twitterStreamingAuthPass);

    }

    private void sendHTTPRequest(String keywords) {
        sendPostRequest("track="+keywords);
    }
    /*
      requestBody is assumed to be already url-encoded (?).
    */
    private void sendPostRequest(String requestBody) {
        String requestLine = twitterStreamingMethod + " "
            + twitterStreamingPath + " HTTP/1.1";
        String hostParam = "Host: " + twitterStreamingServer + ":" +
            twitterStreamingPort;
        String userAgentParam = "User-Agent: curl/7.19.4 (universal-apple-darwin10.0) libcurl/7.19.4 OpenSSL/0.9.8l zlib/1.2.3";
        String authParam = "Authorization: Basic " + authKey;
        String acceptParam = "Accept: " + "*"+ "/"+"*";
        String contentLengthParam = "Content-Length: " + requestBody.length();
        String contentTypeParam = "Content-Type: application/x-www-form-urlencoded";
        sendln(requestLine);
        sendln(authParam);
        sendln(userAgentParam);
        sendln(hostParam);
        sendln(acceptParam);
        sendln(contentLengthParam);
        sendln(contentTypeParam);
        sendln("");// empty line before request body
        sendln(requestBody);
        sendln("");
        sendln(""); // end of request, that is two new line
    }

    private void sendln(String line) {
        client.write(line + "\r\n");
    }

    private void clearByteBuffer(byte[] buf) {
        for (int i = 0; i<buf.length; ++i) {
            buf[i] = 0;
        }
    }


    private void checkBytes() {
        if (client.available() > 0) {
            int byteCount = client.readBytes(currentByteBuffer);
            if (byteCount > 0 ) {
                String response;
                byteBuffersSizes[byteBuffersIndex] = byteCount;
                //                println("received: " + byteCount + " to buffer#" + byteBuffersIndex);

                int r =  processByteBuffers();
                if (r == 0) {
                    byteBuffersStart = (byteBuffersIndex+1)%byteBuffersNum;
                }
                byteBuffersIndex = (byteBuffersIndex+1)%byteBuffersNum;
                currentByteBuffer = byteBuffers[byteBuffersIndex];
                clearByteBuffer(currentByteBuffer);
            }
        }
    }

    private int processByteBuffers() {
        int ssize = 1<<13;
        byte[] s = new byte[ssize];
        int i;
        int k = byteBuffersStart;
        String response;
        /*
          append butters to the first.
          buteBuffersStart = 7, byteBuffersIndex = 0.
          - byteBuffers[7] = {a, b, c}  (s)
          - byteBuffers[8] = {k, l}
          - byteBuffers[9] = {j}
          - byteBuffers[0] = {r, t, i}
          - byteBuffers[1] = {a, o}

          => s = {a, b, c, k, l, j, r, t, i, a, o}

        */
        for (i=0; i<ssize; ++i) {
            s[i] = 0;
        }
        for (i=0; i<byteBuffersSizes[byteBuffersStart]; ++i) {
            s[i] = byteBuffers[k][i];
        }
        k = (k+1) % byteBuffersNum;
        i = byteBuffersSizes[byteBuffersStart];
        //        println("startBuffer's size: " + i);
        if (byteBuffersIndex != byteBuffersStart) {
            while(true){
                byte[] b = byteBuffers[k];
                int bs = byteBuffersSizes[k];
                for (int j=0; j<bs; ++j) {
                    s[i] = b[j];
                    ++i;
                    if (i>=ssize) {
                        // reset the buffer, but doesn't update recentTweet
                        return 0;
                    }
                }
                //                println("appended size:" + bs);
                if (k == byteBuffersIndex) break;
                k = (k+1)%byteBuffersNum;
            }
        }
        try {
            response = new String(s, encoding);
        } catch (UnsupportedEncodingException e) {
            response = "encoding error";
        }
        return processHTTPResponse(response);
    }

    private int processHTTPResponse(String response) {
        Boolean ret;
        String responseBody;
        String statusLine, statusCode;
        Matcher matcher;
        matcher = HTTPResponsePattern.matcher(response);
        // matches the first occurance.
        if (matcher.find()) {
            statusLine = matcher.group(1);
            statusCode = matcher.group(2);
            if (!statusCode.equals("200")) {
                println("Error: status line is " + statusLine);
                return -1;
            }
        }
        matcher = HTTPBodyPattern1.matcher(response);
        if (!matcher.find()) {
            matcher = HTTPBodyPattern2.matcher(response);
            if (!matcher.find()) {
                return -1;
            }
        }
        responseBody = matcher.group(1);
        return processHTTPResponseBody(responseBody);
    }

    private int processHTTPResponseBody(String responseBody) {
        String twitScreenName, twitRealName;
        JSONObject j, user;
        String twitPostedTime;
        String twitText;

        try {
            //            println(responseBody);
            j = new JSONObject(responseBody);
            // if an exception occurs, go to checkBytes.

            twitText = j.getString("text");
            user = j.getJSONObject("user");
            twitScreenName = user.getString("screen_name");
            twitRealName = user.getString("name");
            twitPostedTime = j.getString("created_at");
            recentTweet = new Tweet(twitText, twitScreenName,
                                    twitRealName, twitPostedTime);
        } catch(JSONException e) {
            return -2;
        }
        return 0;
    }
}