Started fixing the video streaming code.

This commit is contained in:
2014-01-07 18:13:55 -04:30
parent 3fc5ce58c3
commit 0e51d0ea22
8 changed files with 572 additions and 219 deletions

View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2013 Miguel Angel Astor Romero
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ve.ucv.ciens.ccg.networkdata;
import java.io.Serializable;
public final class VideoFrameDataMessage implements Serializable{
private static final long serialVersionUID = 9989L;
public static final int magicNumber = 0x10;
public int imageWidth;
public int imageHeight;
public byte[] data;
public VideoFrameDataMessage(){
imageWidth = -1;
imageHeight = -1;
data = null;
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright (C) 2013 Miguel Angel Astor Romero
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ve.ucv.ciens.ccg.networkdata;
import java.io.Serializable;
public final class VideoStreamingControlMessage implements Serializable{
private static final long serialVersionUID = 8898L;
public static final int magicNumber = 0x20;
public byte message;
public VideoStreamingControlMessage(){
message = -1;
}
}

View File

@@ -20,14 +20,17 @@ import ve.ucv.ciens.ccg.nxtar.interfaces.NetworkConnectionListener;
import ve.ucv.ciens.ccg.nxtar.interfaces.Toaster;
import ve.ucv.ciens.ccg.nxtar.network.RobotControlThread;
import ve.ucv.ciens.ccg.nxtar.network.ServiceDiscoveryThread;
import ve.ucv.ciens.ccg.nxtar.network.VideoFrameMonitor;
import ve.ucv.ciens.ccg.nxtar.network.VideoStreamingThread;
import ve.ucv.ciens.ccg.nxtar.utils.ProjectConstants;
import ve.ucv.ciens.ccg.nxtar.utils.Size;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationListener;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.graphics.GL10;
import com.badlogic.gdx.graphics.OrthographicCamera;
import com.badlogic.gdx.graphics.Pixmap;
import com.badlogic.gdx.graphics.Texture;
import com.badlogic.gdx.graphics.Texture.TextureFilter;
import com.badlogic.gdx.graphics.g2d.Sprite;
@@ -46,6 +49,7 @@ public class NxtARCore implements ApplicationListener, NetworkConnectionListener
private MulticastEnabler mcastEnabler;
private int connections;
private VideoFrameMonitor frameMonitor;
private ServiceDiscoveryThread udpThread;
private VideoStreamingThread videoThread;
private RobotControlThread robotThread;
@@ -83,14 +87,16 @@ public class NxtARCore implements ApplicationListener, NetworkConnectionListener
sprite.setPosition(-sprite.getWidth()/2, -sprite.getHeight()/2);
Gdx.app.debug(TAG, CLASS_NAME + ".create() :: Creating network threads");
frameMonitor = VideoFrameMonitor.getInstance();
mcastEnabler.enableMulticast();
udpThread = ServiceDiscoveryThread.getInstance();
videoThread = VideoStreamingThread.getInstance().setToaster(toaster);
robotThread = RobotControlThread.getInstance().setToaster(toaster);
//robotThread = RobotControlThread.getInstance().setToaster(toaster);
udpThread.start();
videoThread.start();
robotThread.start();
videoThread.startStreaming();
//robotThread.start();
}
@Override
@@ -101,13 +107,32 @@ public class NxtARCore implements ApplicationListener, NetworkConnectionListener
@Override
public void render(){
byte[] frame;
Size dimensions;
Gdx.gl.glClearColor(1, 1, 1, 1);
Gdx.gl.glClear(GL10.GL_COLOR_BUFFER_BIT);
batch.setProjectionMatrix(camera.combined);
batch.begin();
sprite.draw(batch);
batch.end();
frame = frameMonitor.getCurrentFrame();
if(frame != null){
texture.dispose();
dimensions = frameMonitor.getFrameDimensions();
texture = new Texture(new Pixmap(frame, 0, dimensions.getWidth() * dimensions.getHeight()));
texture.setFilter(TextureFilter.Linear, TextureFilter.Linear);
TextureRegion region = new TextureRegion(texture, 0, 0, dimensions.getWidth(), dimensions.getHeight());
sprite = new Sprite(region);
sprite.setSize(0.9f, 0.9f * sprite.getHeight() / sprite.getWidth());
sprite.setOrigin(sprite.getWidth()/2, sprite.getHeight()/2);
sprite.setPosition(-sprite.getWidth()/2, -sprite.getHeight()/2);
batch.setProjectionMatrix(camera.combined);
batch.begin();{
sprite.draw(batch);
}batch.end();
}
}
@Override

View File

@@ -0,0 +1,84 @@
/*
* Copyright (C) 2013 Miguel Angel Astor Romero
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ve.ucv.ciens.ccg.nxtar.network;
import ve.ucv.ciens.ccg.nxtar.utils.Size;
import com.badlogic.gdx.Gdx;
public class VideoFrameMonitor{
private final String TAG = "VIDEO_FRAME_MONITOR";
private final String CLASS_NAME = VideoFrameMonitor.class.getSimpleName();
private byte[] frameA;
private byte[] frameB;
private Object frameMonitor;
private Size frameDimensions;
private VideoFrameMonitor(){
frameA = null;
frameB = null;
frameMonitor = new Object();
frameDimensions = new Size();
}
private static class SingletonHolder{
public static final VideoFrameMonitor INSTANCE = new VideoFrameMonitor();
}
public static VideoFrameMonitor getInstance(){
return SingletonHolder.INSTANCE;
}
public void setFrameDimensions(int width, int height){
try{
frameDimensions.setWidth(width);
frameDimensions.setHeight(height);
}catch(IllegalArgumentException ia){
Gdx.app.debug(TAG, CLASS_NAME + ".setFrameDimensions() :: Bad argument to Size: " + ia.getMessage());
frameDimensions.setWidth(0);
frameDimensions.setHeight(0);
}
}
public Size getFrameDimensions(){
return frameDimensions;
}
public void setNewFrame(byte[] frame){
byte[] temp;
Gdx.app.debug(TAG, CLASS_NAME + ".setNewFrame() :: Loading new frame in frameA.");
frameA = frame;
synchronized(frameMonitor){
Gdx.app.debug(TAG, CLASS_NAME + ".setNewFrame() :: Swapping frameA and frameB.");
temp = frameA;
frameA = frameB;
frameB = temp;
Gdx.app.debug(TAG, CLASS_NAME + ".setNewFrame() :: Swapping done.");
}
}
public byte[] getCurrentFrame(){
byte[] frame;
synchronized(frameMonitor){
//Gdx.app.debug(TAG, CLASS_NAME + ".getCurrentFrame() :: Fetching frameB.");
frame = frameB;
}
return frame;
}
}

View File

@@ -16,11 +16,16 @@
package ve.ucv.ciens.ccg.nxtar.network;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import ve.ucv.ciens.ccg.networkdata.VideoFrameDataMessage;
import ve.ucv.ciens.ccg.networkdata.VideoStreamingControlMessage;
import ve.ucv.ciens.ccg.nxtar.interfaces.NetworkConnectionListener;
import ve.ucv.ciens.ccg.nxtar.interfaces.Toaster;
import ve.ucv.ciens.ccg.nxtar.network.protocols.VideoStreamingProtocol;
import ve.ucv.ciens.ccg.nxtar.utils.ProjectConstants;
import com.badlogic.gdx.Gdx;
@@ -30,15 +35,35 @@ public class VideoStreamingThread extends Thread {
private static final String TAG = "NXTAR_CORE_VIDEOTHREAD";
private static final String CLASS_NAME = VideoStreamingThread.class.getSimpleName();
private enum ProtocolState_t {WAIT_FOR_START, SEND_CONTINUE, RECEIVE_DATA, SEND_ACK_NEXT, SEND_ACK_WAIT, PAUSED, END_STREAM};
private NetworkConnectionListener netListener;
private ServerSocket server;
private Socket client;
private Toaster toaster;
private ProtocolState_t protocolState;
private boolean protocolStarted;
private boolean pauseProtocol;
private boolean endProtocol;
private boolean done;
private Object protocolPauseMonitor;
private Socket client;
private ObjectInputStream reader;
private ObjectOutputStream writer;
private VideoFrameMonitor frameMonitor;
private VideoStreamingThread(){
super(THREAD_NAME);
netListener = null;
toaster = null;
protocolStarted = false;
endProtocol = false;
pauseProtocol = false;
done = false;
protocolState = ProtocolState_t.WAIT_FOR_START;
protocolPauseMonitor = new Object();
frameMonitor = VideoFrameMonitor.getInstance();
try{
server = new ServerSocket(ProjectConstants.SERVER_TCP_PORT_1);
}catch(IOException io){
@@ -63,16 +88,278 @@ public class VideoStreamingThread extends Thread {
netListener = listener;
}
@Override
public void run(){
try{
client = server.accept();
if(netListener != null)
netListener.networkStreamConnected(THREAD_NAME);
toaster.showShortToast("Client connected to VideoStreamingThread");
client.close();
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error accepting client: " + io.getMessage(), io);
private void toast(String message){
if(toaster != null)
toaster.showShortToast(message);
}
public void startStreaming(){
if(!protocolStarted){
Gdx.app.debug(TAG, CLASS_NAME + ".startStreaming() :: Requesting protocol start.");
synchronized(protocolPauseMonitor){
protocolStarted = true;
protocolState = ProtocolState_t.SEND_CONTINUE;
protocolPauseMonitor.notifyAll();
}
}
}
public void pauseStreaming(){
if(protocolStarted){
Gdx.app.debug(TAG, CLASS_NAME + ".pauseStreaming() :: Requesting protocol pause.");
pauseProtocol = true;
}else
return;
}
public void resumeStreaming(){
if(protocolStarted){
Gdx.app.debug(TAG, CLASS_NAME + ".resumeStreaming() :: Requesting protocol resume.");
synchronized(protocolPauseMonitor){
pauseProtocol = false;
protocolPauseMonitor.notifyAll();
}
}else
return;
}
public void finishStreaming(){
if(protocolStarted){
Gdx.app.debug(TAG, CLASS_NAME + ".finishStreaming() :: Requesting protocol end.");
endProtocol = true;
}else
return;
}
public void finish(){
done = true;
}
/*@Override
public void run(){
Object tmpMessage;
VideoStreamingControlMessage controlMessage;
VideoFrameDataMessage dataMessage;
// Listen on the server socket until a client successfully connects.
do{
try{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Listening for client.");
client = server.accept();
if(netListener != null)
netListener.networkStreamConnected(THREAD_NAME);
writer = new ObjectOutputStream(client.getOutputStream());
reader = new ObjectInputStream(client.getInputStream());
toast("Client connected");
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error accepting client: " + io.getMessage(), io);
client = null;
}
}while(client != null && !client.isConnected());
while(!done){
switch(protocolState){
case WAIT_FOR_START:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is WAIT_FOR_START.");
// If the app has not started the protocol then wait.
synchronized(protocolPauseMonitor){
while(!protocolStarted){
try{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Protocol has not started, waiting.");
protocolPauseMonitor.wait();
}catch(InterruptedException ie){ }
}
}
break;
case SEND_CONTINUE:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is SEND_CONTINUE.");
// Prepare the message.
controlMessage = new VideoStreamingControlMessage();
if(!endProtocol){
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Preparing STREAM_CONTROL_END message.");
controlMessage.message = VideoStreamingProtocol.STREAM_CONTROL_END;
}else{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Preparing FLOW_CONTROL_CONTINUE message.");
controlMessage.message = VideoStreamingProtocol.FLOW_CONTROL_CONTINUE;
}
// Send it!
try{
writer.writeObject(controlMessage);
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error sending message: " + io.getMessage(), io);
}finally{
protocolState = ProtocolState_t.RECEIVE_DATA;
}
break;
case RECEIVE_DATA:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is RECEIVE_DATA.");
try{
tmpMessage = reader.readObject();
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: IOException while receiving message: " + io.getMessage(), io);
break;
}catch(ClassNotFoundException cn){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: ClassNotFoundException while receiving message: " + cn.getMessage(), cn);
break;
}
if(tmpMessage instanceof VideoStreamingControlMessage){
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Received a control message.");
controlMessage = (VideoStreamingControlMessage) tmpMessage;
// TODO: handle this case correctly.
}else if(tmpMessage instanceof VideoFrameDataMessage){
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Received a data message.");
dataMessage = (VideoFrameDataMessage) tmpMessage;
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Received frame dimensions are: " +
Integer.toString(dataMessage.imageWidth) + "x" + Integer.toString(dataMessage.imageHeight));
frameMonitor.setFrameDimensions(dataMessage.imageWidth, dataMessage.imageHeight);
frameMonitor.setNewFrame(dataMessage.data);
if(pauseProtocol)
protocolState = ProtocolState_t.SEND_ACK_WAIT;
else
protocolState = ProtocolState_t.SEND_ACK_NEXT;
}else{
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Unrecognized message received!.");
// TODO: handle this case correctly.
System.exit(ProjectConstants.EXIT_FAILURE);
}
break;
case SEND_ACK_NEXT:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is SEND_ACK_NEXT.");
// Prepare the message.
controlMessage = new VideoStreamingControlMessage();
if(!endProtocol){
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Preparing STREAM_CONTROL_END message.");
controlMessage.message = VideoStreamingProtocol.STREAM_CONTROL_END;
}else{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Preparing ACK_SEND_NEXT message.");
controlMessage.message = VideoStreamingProtocol.ACK_SEND_NEXT;
}
// Send it!
try{
writer.writeObject(controlMessage);
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error sending message: " + io.getMessage(), io);
}finally{
if(!endProtocol)
protocolState = ProtocolState_t.RECEIVE_DATA;
else
protocolState = ProtocolState_t.END_STREAM;
}
break;
case SEND_ACK_WAIT:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is SEND_ACK_WAIT.");
// Prepare the message.
controlMessage = new VideoStreamingControlMessage();
controlMessage.message = VideoStreamingProtocol.ACK_WAIT;
// Send it!
try{
writer.writeObject(controlMessage);
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error sending message: " + io.getMessage(), io);
}finally{
protocolState = ProtocolState_t.PAUSED;
}
break;
case PAUSED:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is PAUSED.");
// The app requested to stop the protocol temporarily.
synchronized(protocolPauseMonitor){
while(pauseProtocol){
try{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Protocol pause requested, waiting.");
protocolPauseMonitor.wait();
}catch(InterruptedException ie){ }
}
}
protocolState = ProtocolState_t.SEND_CONTINUE;
break;
case END_STREAM:
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: State is END_STREAM.");
// Simply disconnect from the client and end the thread.
try{
client.close();
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error closing client: " + io.getMessage(), io);
}
done = true;
break;
}
}
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Thread finished.");
}*/
private void receiveImage(){
Object tmpMessage;
VideoFrameDataMessage dataMessage;
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Receiving data.");
try{
tmpMessage = (VideoFrameDataMessage)reader.readObject();
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: IOException while receiving message: " + io.getMessage());
return;
}catch(ClassNotFoundException cn){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: ClassNotFoundException while receiving message: " + cn.getMessage());
return;
}
if(tmpMessage instanceof VideoFrameDataMessage){
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Received a data message.");
dataMessage = (VideoFrameDataMessage) tmpMessage;
frameMonitor.setFrameDimensions(dataMessage.imageWidth, dataMessage.imageHeight);
frameMonitor.setNewFrame(dataMessage.data);
}else{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Received something unknown.");
}
}
@Override
public void run(){
// Listen on the server socket until a client successfully connects.
do{
try{
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Listening for client.");
client = server.accept();
if(netListener != null)
netListener.networkStreamConnected(THREAD_NAME);
writer = new ObjectOutputStream(client.getOutputStream());
reader = new ObjectInputStream(client.getInputStream());
toast("Client connected");
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error accepting client: " + io.getMessage(), io);
client = null;
}
}while(client != null && !client.isConnected());
while(!done){
receiveImage();
}
try{
client.close();
}catch(IOException io){
Gdx.app.error(TAG, CLASS_NAME + ".run() :: Error closing client socket.", io);
}
Gdx.app.debug(TAG, CLASS_NAME + ".run() :: Thread finished.");
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2013 Miguel Angel Astor Romero
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ve.ucv.ciens.ccg.nxtar.network.protocols;
public final class VideoStreamingProtocol{
public static final byte STREAM_CONTROL_END = 0x10;
public static final byte ACK_SEND_NEXT = 0x20;
public static final byte ACK_WAIT = 0x30;
public static final byte FLOW_CONTROL_WAIT = 0x40;
public static final byte FLOW_CONTROL_CONTINUE = 0x50;
public static final byte IMAGE_DATA = 0x60;
public static final byte UNRECOGNIZED = (byte)0xFF;
public static boolean checkValidityOfMessage(byte message){
boolean validity;
switch(message){
case STREAM_CONTROL_END:
case ACK_SEND_NEXT:
case ACK_WAIT:
case FLOW_CONTROL_WAIT:
case FLOW_CONTROL_CONTINUE:
case IMAGE_DATA:
case UNRECOGNIZED:
validity = true;
break;
default:
validity = false;
}
return validity;
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2013 Miguel Angel Astor Romero
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ve.ucv.ciens.ccg.nxtar.utils;
public class Size {
private int width;
private int height;
public Size(){
width = 0;
height = 0;
}
public Size(int width, int height){
this.width = (width >= 0) ? width : -1 * width;
this.height = (height >= 0) ? height : -1 * height;
}
public int getWidth(){
return width;
}
public int getHeight(){
return height;
}
public void setWidth(int width) throws IllegalArgumentException{
if(width < 0)
throw new IllegalArgumentException("Width must not be less than cero.");
this.width = width;
}
public void setHeight(int height) throws IllegalArgumentException{
if(height < 0)
throw new IllegalArgumentException("Height must not be less than cero.");
this.height = height;
}
}