How to detect the bitrate of a live stream in Red5 or Red5 Pro

Sometimes you need to calculate the bitrate (in kbps) of a live stream to restrict the usage or manage to the bill of the user. As of today, there is no direct way to get the bitrate of a live stream in Red5 open source or Red5 Pro. Instead, you can use this old-school sampling technique to estimate the bitrate of a live stream which can be coded in any language for any platform.

The steps involved in evaluating the bitrate cab roughly be written as in the following manner :

  1. On application startup, start a timer which will be running continuously (every 2 seconds), aiding in evaluation of bitrates of all candidate streams taken fro a list.
  2. Detect publish start for a new stream
  3. On publish start add the stream to a list as a candidate for evaluation
  4. For each evaluation cycle inside the timer calculate the current estimated bitrate using bytes-transferred-inwards data
  5. Store the evaluated bitrate sample in a tmp data structure. In the similar way capture about 4 or 6 data samples.
  6. Take the average of all the data samples collected to get a average bitrate
  7. Finally use this average bitrate data to calculate bitrate per second.
  8. On unpublish remove the stream from list of candidates so that we dont get a NullPointerException

The code snippet below demonstrates how to calculate and log the average bitrate of a live stream in Red5.

NOTE: For Red5 opensource the stream stats can be obtained by casting it to IClientBroadcastStream and for Red5 Pro you need to cast it to IProStream (which can be obtained from red5pro-commons).


public class Application extends MultiThreadedApplicationAdapter implements IStreamListener {

    private static Logger logger = LoggerFactory.getLogger(Application.class);
   
    private Map<String ,IProStream> streamTable;
   
    private Map<String ,StreamStats> streamStatsTable;
   
    // every 2 seconds we check bytes per second
        private int SAMPLING_RATE = 2;

        private Timer evaluationTimer;

 
    class StreamStats {
       
        private int maxBitrateSamples = 4;
       
        private long lastSampleTimestamp;
       
        private long lastSampleBytesReceived;
       
        private long lastPacketReceievedTime;
       
        private CircularFifoBuffer bytesPerSecond;
       
        private long timeSampleRate;
       
       
       
        public StreamStats()
        {
            bytesPerSecond = new CircularFifoBuffer(maxBitrateSamples);
        }
       
        public StreamStats(long timeSampleRate)
        {
            bytesPerSecond = new CircularFifoBuffer(maxBitrateSamples);
            this.timeSampleRate = timeSampleRate;
        }
   
        public void setLastPacketReceievedTime(long lastPacketReceievedTime) {
            this.lastPacketReceievedTime = lastPacketReceievedTime;
        }
   
        public void addSample(long timeStampSample, long bytesReceivedSample)
        {
            logger.info("Storing bitrate sample {}", bytesReceivedSample);
        long estTimeDifference = (this.timeSampleRate == 0)?(timeStampSample - this.lastSampleTimestamp):this.timeSampleRate;
        long bytesReceivedDifference = bytesReceivedSample - this.lastSampleBytesReceived;
       
        long bytesDifferencePerSecond = (bytesReceivedDifference / estTimeDifference) * 1000;
        bytesPerSecond.add(bytesDifferencePerSecond);
       
        this.lastSampleTimestamp = timeStampSample;
        this.lastSampleBytesReceived = bytesReceivedSample;
    }
   
   
    public Double getAverageBytesPerSecond()
    {
        Iterator<Long> buf = bytesPerSecond.iterator();
        Double t = 0.0;
         while (buf.hasNext()) {
            long bytesDifferencePerSecond = buf.next();
            t += bytesDifferencePerSecond;
        }
         
        return t / maxBitrateSamples;
    }  
}


/**
 * Evaluation of stream data transfer
 *
 */
class EvaluateStream extends TimerTask
{
    @Override
    public void run()
    {
        Iterator<Map.Entry<String, IProStream>> iterator = streamTable.entrySet().iterator();
        while(iterator.hasNext())
        {
            Map.Entry<String, IProStream> entry = iterator.next();
            String uniqueIndexer = entry.getKey();
            IProStream stream = entry.getValue();
            StreamStats  stats = streamStatsTable.get(uniqueIndexer);
           
            try
            {
                long timeInstance = System.currentTimeMillis();
                long bytesReceievedAtTimeInstance = stream.getStatistics().getBytesReceived();
               
                stats.addSample(timeInstance, bytesReceievedAtTimeInstance);
                Double bps = stats.getAverageBytesPerSecond();
               
                logger.info("Live stream " + stream.getPublishedName() + " at " + stream.getScope().getName());
            logger.info("Avg bitrate = " + bps + " Bps");
        }
        catch(Exception e)
        {
            logger.error("Error evaluating bitrate of stream " + e.getCause());
            }
        }
       
    }
   
}




    @Override
    public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
       
        // here we check to see when the last packet has been receieved as per server time
    long timeInstance = System.currentTimeMillis();
    String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();
   
    StreamStats stats = null;
    if(streamStatsTable.containsKey(uniqueIndexer)){
        stats = streamStatsTable.get(uniqueIndexer);
        stats.setLastPacketReceievedTime(timeInstance);
    }
}



@Override
public boolean appStart(IScope arg0) {
    super.appStart(arg0);
   
    evaluationTimer = new Timer(true);
    EvaluateStream evaluationTask = new EvaluateStream();
    evaluationTimer.scheduleAtFixedRate(evaluationTask, 0, SAMPLING_RATE * 1000);
   
    return true;
}



@Override
public void appStop(IScope arg0) {
    super.appStop(arg0);
    evaluationTimer.cancel();
}



@Override
public void streamBroadcastStart(IBroadcastStream arg0) {
    log.info("Stream broadcast start {}", arg0);
super.streamBroadcastClose(arg0);

IProStream stream = (IProStream) arg0;
String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();

// if not in index add
    if(!streamTable.containsKey(uniqueIndexer))
    {
        long timeInstance = System.currentTimeMillis();
       
        StreamStats stats = new StreamStats(SAMPLING_RATE * 1000);
        stats.addSample(timeInstance, stream.getStatistics().getBytesReceived());
       
        streamStatsTable.put(uniqueIndexer, stats);
        streamTable.put(uniqueIndexer, stream);
        stream.addStreamListener(this);
    }
}



@Override
public void streamBroadcastClose(IBroadcastStream arg0) {
    log.info("Stream broadcast close {}", arg0);
        super.streamBroadcastClose(arg0);
       
        IProStream stream = (IProStream) arg0;
        String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();
       
        if(streamStatsTable.containsKey(uniqueIndexer))
        {            
            StreamStats stats = streamStatsTable.get(uniqueIndexer);
            if(stats != null)
            {
                streamStatsTable.remove(uniqueIndexer);
                streamTable.remove(uniqueIndexer);
                stream.removeStreamListener(this);
            }
        }
    }
}

If you have trouble getting emails from this domain, please check your email spam & mark messages from flashvisions.com as 'Not Spam'

Comments are closed