Sometimes you need to calculate the bitrate (in kbps) of a live stream to restrict the usage or manage to the bill of the user. As of today, there is no direct way to get the bitrate of a live stream in Red5 open source or Red5 Pro. Instead, you can use this old-school sampling technique to estimate the bitrate of a live stream which can be coded in any language for any platform.
The steps involved in evaluating the bitrate cab roughly be written as in the following manner :
- On application startup, start a timer which will be running continuously (every 2 seconds), aiding in evaluation of bitrates of all candidate streams taken fro a list.
- Detect publish start for a new stream
- On publish start add the stream to a list as a candidate for evaluation
- For each evaluation cycle inside the timer calculate the current estimated bitrate using bytes-transferred-inwards data
- Store the evaluated bitrate sample in a tmp data structure. In the similar way capture about 4 or 6 data samples.
- Take the average of all the data samples collected to get a average bitrate
- Finally use this average bitrate data to calculate bitrate per second.
- On unpublish remove the stream from list of candidates so that we dont get a NullPointerException
The code snippet below demonstrates how to calculate and log the average bitrate of a live stream in Red5.
NOTE: For Red5 opensource the stream stats can be obtained by casting it to IClientBroadcastStream and for Red5 Pro you need to cast it to IProStream (which can be obtained from red5pro-commons).
public class Application extends MultiThreadedApplicationAdapter implements IStreamListener {
private static Logger logger = LoggerFactory.getLogger(Application.class);
private Map<String ,IProStream> streamTable;
private Map<String ,StreamStats> streamStatsTable;
// every 2 seconds we check bytes per second
private int SAMPLING_RATE = 2;
private Timer evaluationTimer;
class StreamStats {
private int maxBitrateSamples = 4;
private long lastSampleTimestamp;
private long lastSampleBytesReceived;
private long lastPacketReceievedTime;
private CircularFifoBuffer bytesPerSecond;
private long timeSampleRate;
public StreamStats()
{
bytesPerSecond = new CircularFifoBuffer(maxBitrateSamples);
}
public StreamStats(long timeSampleRate)
{
bytesPerSecond = new CircularFifoBuffer(maxBitrateSamples);
this.timeSampleRate = timeSampleRate;
}
public void setLastPacketReceievedTime(long lastPacketReceievedTime) {
this.lastPacketReceievedTime = lastPacketReceievedTime;
}
public void addSample(long timeStampSample, long bytesReceivedSample)
{
logger.info("Storing bitrate sample {}", bytesReceivedSample);
long estTimeDifference = (this.timeSampleRate == 0)?(timeStampSample - this.lastSampleTimestamp):this.timeSampleRate;
long bytesReceivedDifference = bytesReceivedSample - this.lastSampleBytesReceived;
long bytesDifferencePerSecond = (bytesReceivedDifference / estTimeDifference) * 1000;
bytesPerSecond.add(bytesDifferencePerSecond);
this.lastSampleTimestamp = timeStampSample;
this.lastSampleBytesReceived = bytesReceivedSample;
}
public Double getAverageBytesPerSecond()
{
Iterator<Long> buf = bytesPerSecond.iterator();
Double t = 0.0;
while (buf.hasNext()) {
long bytesDifferencePerSecond = buf.next();
t += bytesDifferencePerSecond;
}
return t / maxBitrateSamples;
}
}
/**
* Evaluation of stream data transfer
*
*/
class EvaluateStream extends TimerTask
{
@Override
public void run()
{
Iterator<Map.Entry<String, IProStream>> iterator = streamTable.entrySet().iterator();
while(iterator.hasNext())
{
Map.Entry<String, IProStream> entry = iterator.next();
String uniqueIndexer = entry.getKey();
IProStream stream = entry.getValue();
StreamStats stats = streamStatsTable.get(uniqueIndexer);
try
{
long timeInstance = System.currentTimeMillis();
long bytesReceievedAtTimeInstance = stream.getStatistics().getBytesReceived();
stats.addSample(timeInstance, bytesReceievedAtTimeInstance);
Double bps = stats.getAverageBytesPerSecond();
logger.info("Live stream " + stream.getPublishedName() + " at " + stream.getScope().getName());
logger.info("Avg bitrate = " + bps + " Bps");
}
catch(Exception e)
{
logger.error("Error evaluating bitrate of stream " + e.getCause());
}
}
}
}
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
// here we check to see when the last packet has been receieved as per server time
long timeInstance = System.currentTimeMillis();
String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();
StreamStats stats = null;
if(streamStatsTable.containsKey(uniqueIndexer)){
stats = streamStatsTable.get(uniqueIndexer);
stats.setLastPacketReceievedTime(timeInstance);
}
}
@Override
public boolean appStart(IScope arg0) {
super.appStart(arg0);
evaluationTimer = new Timer(true);
EvaluateStream evaluationTask = new EvaluateStream();
evaluationTimer.scheduleAtFixedRate(evaluationTask, 0, SAMPLING_RATE * 1000);
return true;
}
@Override
public void appStop(IScope arg0) {
super.appStop(arg0);
evaluationTimer.cancel();
}
@Override
public void streamBroadcastStart(IBroadcastStream arg0) {
log.info("Stream broadcast start {}", arg0);
super.streamBroadcastClose(arg0);
IProStream stream = (IProStream) arg0;
String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();
// if not in index add
if(!streamTable.containsKey(uniqueIndexer))
{
long timeInstance = System.currentTimeMillis();
StreamStats stats = new StreamStats(SAMPLING_RATE * 1000);
stats.addSample(timeInstance, stream.getStatistics().getBytesReceived());
streamStatsTable.put(uniqueIndexer, stats);
streamTable.put(uniqueIndexer, stream);
stream.addStreamListener(this);
}
}
@Override
public void streamBroadcastClose(IBroadcastStream arg0) {
log.info("Stream broadcast close {}", arg0);
super.streamBroadcastClose(arg0);
IProStream stream = (IProStream) arg0;
String uniqueIndexer = stream.getScope().getPath() + stream.getScope().getName() + stream.getName();
if(streamStatsTable.containsKey(uniqueIndexer))
{
StreamStats stats = streamStatsTable.get(uniqueIndexer);
if(stats != null)
{
streamStatsTable.remove(uniqueIndexer);
streamTable.remove(uniqueIndexer);
stream.removeStreamListener(this);
}
}
}
}