`
phinecos
  • 浏览: 342949 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

基于JMF RTP的音视频传输

 
阅读更多
<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->
importjava.io.
*;
importjava.awt.
*;
importjava.net.
*;
importjava.awt.
event.*;
importjava.util.Vector;

importjavax.media.
*;
importjavax.media.rtp.
*;
importjavax.media.rtp.
event.*;
importjavax.media.rtp.rtcp.
*;
importjavax.media.protocol.
*;
importjavax.media.protocol.DataSource;
importjavax.media.format.AudioFormat;
importjavax.media.format.VideoFormat;
importjavax.media.Format;
importjavax.media.format.FormatChangeEvent;
importjavax.media.control.BufferControl;


/**
*AVReceive3toreceiveRTPtransmissionusingtheRTPConnector.
*/
publicclassAVReceive3implementsReceiveStreamListener,SessionListener,
ControllerListener
{
Stringsessions[]
=null;
RTPManagermgrs[]
=null;
VectorplayerWindows
=null;

booleandataReceived
=false;
ObjectdataSync
=newObject();


publicAVReceive3(Stringsessions[]){
this.sessions=sessions;
}

protectedbooleaninitialize(){

try{
mgrs
=newRTPManager[sessions.length];
playerWindows
=newVector();

SessionLabelsession;

//OpentheRTPsessions.
for(inti=0;i<sessions.length;i++){

//Parsethesessionaddresses.
try{
session
=newSessionLabel(sessions[i]);
}
catch(IllegalArgumentExceptione){
System.err.println(
"Failedtoparsethesessionaddressgiven:"+sessions[i]);
returnfalse;
}

System.err.println(
"-OpenRTPsessionfor:addr:"+session.addr+"port:"+session.port+"ttl:"+session.ttl);

mgrs[i]
=(RTPManager)RTPManager.newInstance();
mgrs[i].addSessionListener(
this);
mgrs[i].addReceiveStreamListener(
this);

//InitializetheRTPManagerwiththeRTPSocketAdapter
mgrs[i].initialize(newRTPSocketAdapter(
InetAddress.getByName(session.addr),
session.port,session.ttl));

//Youcantryoutsomeotherbuffersizetosee
//ifyoucangetbettersmoothness.
BufferControlbc=(BufferControl)mgrs[i].getControl("javax.media.control.BufferControl");
if(bc!=null)
bc.setBufferLength(
350);
}

}
catch(Exceptione){
System.err.println(
"CannotcreatetheRTPSession:"+e.getMessage());
returnfalse;
}

//Waitfordatatoarrivebeforemovingon.

longthen=System.currentTimeMillis();
longwaitingPeriod=30000;//waitforamaximumof30secs.

try{
synchronized(dataSync){
while(!dataReceived&&
System.currentTimeMillis()
-then<waitingPeriod){
if(!dataReceived)
System.err.println(
"-WaitingforRTPdatatoarrive");
dataSync.wait(
1000);
}
}
}
catch(Exceptione){}

if(!dataReceived){
System.err.println(
"NoRTPdatawasreceived.");
close();
returnfalse;
}

returntrue;
}


publicbooleanisDone(){
returnplayerWindows.size()==0;
}


/**
*Closetheplayersandthesessionmanagers.
*/
protectedvoidclose(){

for(inti=0;i<playerWindows.size();i++){
try{
((PlayerWindow)playerWindows.elementAt(i)).close();
}
catch(Exceptione){}
}

playerWindows.removeAllElements();

//closetheRTPsession.
for(inti=0;i<mgrs.length;i++){
if(mgrs[i]!=null){
mgrs[i].removeTargets(
"ClosingsessionfromAVReceive3");
mgrs[i].dispose();
mgrs[i]
=null;
}
}
}


PlayerWindowfind(Playerp){
for(inti=0;i<playerWindows.size();i++){
PlayerWindowpw
=(PlayerWindow)playerWindows.elementAt(i);
if(pw.player==p)
returnpw;
}
returnnull;
}


PlayerWindowfind(ReceiveStreamstrm){
for(inti=0;i<playerWindows.size();i++){
PlayerWindowpw
=(PlayerWindow)playerWindows.elementAt(i);
if(pw.stream==strm)
returnpw;
}
returnnull;
}


/**
*SessionListener.
*/
publicsynchronizedvoidupdate(SessionEventevt){
if(evtinstanceofNewParticipantEvent){
Participantp
=((NewParticipantEvent)evt).getParticipant();
System.err.println(
"-Anewparticipanthadjustjoined:"+p.getCNAME());
}
}


/**
*ReceiveStreamListener
*/
publicsynchronizedvoidupdate(ReceiveStreamEventevt){

RTPManagermgr
=(RTPManager)evt.getSource();
Participantparticipant
=evt.getParticipant();//couldbenull.
ReceiveStreamstream=evt.getReceiveStream();//couldbenull.

if(evtinstanceofRemotePayloadChangeEvent){

System.err.println(
"-ReceivedanRTPPayloadChangeEvent.");
System.err.println(
"Sorry,cannothandlepayloadchange.");
System.exit(
0);

}

elseif(evtinstanceofNewReceiveStreamEvent){

try{
stream
=((NewReceiveStreamEvent)evt).getReceiveStream();
DataSourceds
=stream.getDataSource();

//Findouttheformats.
RTPControlctl=(RTPControl)ds.getControl("javax.media.rtp.RTPControl");
if(ctl!=null){
System.err.println(
"-ReceviednewRTPstream:"+ctl.getFormat());
}
else
System.err.println(
"-ReceviednewRTPstream");

if(participant==null)
System.err.println(
"Thesenderofthisstreamhadyettobeidentified.");
else{
System.err.println(
"Thestreamcomesfrom:"+participant.getCNAME());
}

//createaplayerbypassingdatasourcetotheMediaManager
Playerp=javax.media.Manager.createPlayer(ds);
if(p==null)
return;

p.addControllerListener(
this);
p.realize();
PlayerWindowpw
=newPlayerWindow(p,stream);
playerWindows.addElement(pw);

//Notifyintialize()thatanewstreamhadarrived.
synchronized(dataSync){
dataReceived
=true;
dataSync.notifyAll();
}

}
catch(Exceptione){
System.err.println(
"NewReceiveStreamEventexception"+e.getMessage());
return;
}

}

elseif(evtinstanceofStreamMappedEvent){

if(stream!=null&&stream.getDataSource()!=null){
DataSourceds
=stream.getDataSource();
//Findouttheformats.
RTPControlctl=(RTPControl)ds.getControl("javax.media.rtp.RTPControl");
System.err.println(
"-Thepreviouslyunidentifiedstream");
if(ctl!=null)
System.err.println(
""+ctl.getFormat());
System.err.println(
"hadnowbeenidentifiedassentby:"+participant.getCNAME());
}
}

elseif(evtinstanceofByeEvent){

System.err.println(
"-Got/"bye/"from:"+participant.getCNAME());
PlayerWindowpw
=find(stream);
if(pw!=null){
pw.close();
playerWindows.removeElement(pw);
}
}

}


/**
*ControllerListenerforthePlayers.
*/
publicsynchronizedvoidcontrollerUpdate(ControllerEventce){

Playerp
=(Player)ce.getSourceController();

if(p==null)
return;

//Getthiswhentheinternalplayersarerealized.
if(ceinstanceofRealizeCompleteEvent){
PlayerWindowpw
=find(p);
if(pw==null){
//Somestrangehappened.
System.err.println("Internalerror!");
System.exit(
-1);
}
pw.initialize();
pw.setVisible(
true);
p.start();
}

if(ceinstanceofControllerErrorEvent){
p.removeControllerListener(
this);
PlayerWindowpw
=find(p);
if(pw!=null){
pw.close();
playerWindows.removeElement(pw);
}
System.err.println(
"AVReceive3internalerror:"+ce);
}

}


/**
*Autilityclasstoparsethesessionaddresses.
*/
classSessionLabel{

publicStringaddr=null;
publicintport;
publicintttl=1;

SessionLabel(Stringsession)throwsIllegalArgumentException{

intoff;
StringportStr
=null,ttlStr=null;

if(session!=null&&session.length()>0){
while(session.length()>1&&session.charAt(0)=='/')
session
=session.substring(1);

//Nowseeifthere'saaddrspecified.
off=session.indexOf('/');
if(off==-1){
if(!session.equals(""))
addr
=session;
}
else{
addr
=session.substring(0,off);
session
=session.substring(off+1);
//Nowseeifthere'saportspecified
off=session.indexOf('/');
if(off==-1){
if(!session.equals(""))
portStr
=session;
}
else{
portStr
=session.substring(0,off);
session
=session.substring(off+1);
//Nowseeifthere'sattlspecified
off=session.indexOf('/');
if(off==-1){
if(!session.equals(""))
ttlStr
=session;
}
else{
ttlStr
=session.substring(0,off);
}
}
}
}

if(addr==null)
thrownewIllegalArgumentException();

if(portStr!=null){
try{
Integerinteger
=Integer.valueOf(portStr);
if(integer!=null)
port
=integer.intValue();
}
catch(Throwablet){
thrownewIllegalArgumentException();
}
}
else
thrownewIllegalArgumentException();

if(ttlStr!=null){
try{
Integerinteger
=Integer.valueOf(ttlStr);
if(integer!=null)
ttl
=integer.intValue();
}
catch(Throwablet){
thrownewIllegalArgumentException();
}
}
}
}


/**
*GUIclassesforthePlayer.
*/
classPlayerWindowextendsFrame{

Playerplayer;
ReceiveStreamstream;

PlayerWindow(Playerp,ReceiveStreamstrm){
player
=p;
stream
=strm;
}

publicvoidinitialize(){
add(
newPlayerPanel(player));
}

publicvoidclose(){
player.close();
setVisible(
false);
dispose();
}

publicvoidaddNotify(){
super.addNotify();
pack();
}
}


/**
*GUIclassesforthePlayer.
*/
classPlayerPanelextendsPanel{

Componentvc,cc;

PlayerPanel(Playerp){
setLayout(
newBorderLayout());
if((vc=p.getVisualComponent())!=null)
add(
"Center",vc);
if((cc=p.getControlPanelComponent())!=null)
add(
"South",cc);
}

publicDimensiongetPreferredSize(){
intw=0,h=0;
if(vc!=null){
Dimensionsize
=vc.getPreferredSize();
w
=size.width;
h
=size.height;
}
if(cc!=null){
Dimensionsize
=cc.getPreferredSize();
if(w==0)
w
=size.width;
h
+=size.height;
}
if(w<160)
w
=160;
returnnewDimension(w,h);
}
}


publicstaticvoidmain(Stringargv[]){
if(argv.length==0)
prUsage();

AVReceive3avReceive
=newAVReceive3(argv);
if(!avReceive.initialize()){
System.err.println(
"Failedtoinitializethesessions.");
System.exit(
-1);
}

//ChecktoseeifAVReceive3isdone.
try{
while(!avReceive.isDone())
Thread.sleep(
1000);
}
catch(Exceptione){}

System.err.println(
"ExitingAVReceive3");
}


staticvoidprUsage(){
System.err.println(
"Usage:AVReceive3<session><session>");
System.err.println(
"<session>:<address>/<port>/<ttl>");
System.exit(
0);
}

}
//endofAVReceive3

发送端代码:

<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->importjava.awt.*;
importjava.io.
*;
importjava.net.InetAddress;
importjavax.media.
*;
importjavax.media.protocol.
*;
importjavax.media.protocol.DataSource;
importjavax.media.format.
*;
importjavax.media.control.TrackControl;
importjavax.media.control.QualityControl;
importjavax.media.rtp.
*;
importjavax.media.rtp.rtcp.
*;
importcom.sun.media.rtp.
*;

publicclassAVTransmit3{

//InputMediaLocator
//Canbeafileorhttporcapturesource
privateMediaLocatorlocator;
privateStringipAddress;
privateintportBase;

privateProcessorprocessor=null;
privateRTPManagerrtpMgrs[];
privateDataSourcedataOutput=null;

publicAVTransmit3(MediaLocatorlocator,
StringipAddress,
Stringpb,
Formatformat){

this.locator=locator;
this.ipAddress=ipAddress;
Integerinteger
=Integer.valueOf(pb);
if(integer!=null)
this.portBase=integer.intValue();
}

/**
*Startsthetransmission.Returnsnulliftransmissionstartedok.
*Otherwiseitreturnsastringwiththereasonwhythesetupfailed.
*/
publicsynchronizedStringstart(){
Stringresult;

//Createaprocessorforthespecifiedmedialocator
//andprogramittooutputJPEG/RTP
result=createProcessor();
if(result!=null)
returnresult;

//CreateanRTPsessiontotransmittheoutputofthe
//processortothespecifiedIPaddressandportno.
result=createTransmitter();
if(result!=null){
processor.close();
processor
=null;
returnresult;
}

//Startthetransmission
processor.start();

returnnull;
}

/**
*Stopsthetransmissionifalreadystarted
*/
publicvoidstop(){
synchronized(
this){
if(processor!=null){
processor.stop();
processor.close();
processor
=null;
for(inti=0;i<rtpMgrs.length;i++){
rtpMgrs[i].removeTargets(
"Sessionended.");
rtpMgrs[i].dispose();
}
}
}
}

privateStringcreateProcessor(){
if(locator==null)
return"Locatorisnull";

DataSourceds;
DataSourceclone;

try{
ds
=javax.media.Manager.createDataSource(locator);
}
catch(Exceptione){
return"Couldn'tcreateDataSource";
}

//Trytocreateaprocessortohandletheinputmedialocator
try{
processor
=javax.media.Manager.createProcessor(ds);
}
catch(NoProcessorExceptionnpe){
return"Couldn'tcreateprocessor";
}
catch(IOExceptionioe){
return"IOExceptioncreatingprocessor";
}

//Waitforittoconfigure
booleanresult=waitForState(processor,Processor.Configured);
if(result==false)
return"Couldn'tconfigureprocessor";

//Getthetracksfromtheprocessor
TrackControl[]tracks=processor.getTrackControls();

//Dowehaveatleastonetrack?
if(tracks==null||tracks.length<1)
return"Couldn'tfindtracksinprocessor";

//SettheoutputcontentdescriptortoRAW_RTP
//Thiswilllimitthesupportedformatsreportedfrom
//Track.getSupportedFormatstoonlyvalidRTPformats.
ContentDescriptorcd=newContentDescriptor(ContentDescriptor.RAW_RTP);
processor.setContentDescriptor(cd);

Formatsupported[];
Formatchosen;
booleanatLeastOneTrack
=false;

//Programthetracks.
for(inti=0;i<tracks.length;i++){
Formatformat
=tracks[i].getFormat();
if(tracks[i].isEnabled()){

supported
=tracks[i].getSupportedFormats();

//We'vesettheoutputcontenttotheRAW_RTP.
//SoallthesupportedformatsshouldworkwithRTP.
//We'lljustpickthefirstone.

if(supported.length>0){
if(supported[0]instanceofVideoFormat){
//Forvideoformats,weshoulddoublecheckthe
//sizessincenotallformatsworkinallsizes.
chosen=checkForVideoSizes(tracks[i].getFormat(),
supported[
0]);
}
else
chosen
=supported[0];
tracks[i].setFormat(chosen);
System.err.println(
"Track"+i+"issettotransmitas:");
System.err.println(
""+chosen);
atLeastOneTrack
=true;
}
else
tracks[i].setEnabled(
false);
}
else
tracks[i].setEnabled(
false);
}

if(!atLeastOneTrack)
return"Couldn'tsetanyofthetrackstoavalidRTPformat";

//Realizetheprocessor.Thiswillinternallycreateaflow
//graphandattempttocreateanoutputdatasourceforJPEG/RTP
//audioframes.
result=waitForState(processor,Controller.Realized);
if(result==false)
return"Couldn'trealizeprocessor";

//SettheJPEGqualityto.5.
setJPEGQuality(processor,0.5f);

//Gettheoutputdatasourceoftheprocessor
dataOutput=processor.getDataOutput();

returnnull;
}


/**
*UsetheRTPManagerAPItocreatesessionsforeachmedia
*trackoftheprocessor.
*/
privateStringcreateTransmitter(){

//Cheated.Shouldhavecheckedthetype.
PushBufferDataSourcepbds=(PushBufferDataSource)dataOutput;
PushBufferStreampbss[]
=pbds.getStreams();

rtpMgrs
=newRTPManager[pbss.length];
SendStreamsendStream;
intport;
SourceDescriptionsrcDesList[];

for(inti=0;i<pbss.length;i++){
try{
rtpMgrs[i]
=RTPManager.newInstance();

port
=portBase+2*i;

//InitializetheRTPManagerwiththeRTPSocketAdapter
rtpMgrs[i].initialize(newRTPSocketAdapter(
InetAddress.getByName(ipAddress),
port));

System.err.println(
"CreatedRTPsession:"+ipAddress+""+port);

sendStream
=rtpMgrs[i].createSendStream(dataOutput,i);
sendStream.start();
}
catch(Exceptione){
returne.getMessage();
}
}

returnnull;
}


/**
*ForJPEGandH263,weknowthattheyonlyworkforparticular
*sizes.Sowe'llperformextracheckingheretomakesurethey
*areoftherightsizes.
*/
FormatcheckForVideoSizes(Formatoriginal,Formatsupported){

intwidth,height;
Dimensionsize
=((VideoFormat)original).getSize();
FormatjpegFmt
=newFormat(VideoFormat.JPEG_RTP);
Formath263Fmt
=newFormat(VideoFormat.H263_RTP);

if(supported.matches(jpegFmt)){
//ForJPEG,makesurewidthandheightaredivisibleby8.
width=(size.width%8==0?size.width:
(
int)(size.width/8)*8);
height
=(size.height%8==0?size.height:
(
int)(size.height/8)*8);
}
elseif(supported.matches(h263Fmt)){
//ForH.263,weonlysupportsomespecificsizes.
if(size.width<128){
width
=128;
height
=96;
}
elseif(size.width<176){
width
=176;
height
=144;
}
else{
width
=352;
height
=288;
}
}
else{
//Wedon'tknowthisparticularformat.We'lljust
//leaveitalonethen.
returnsupported;
}

return(newVideoFormat(null,
newDimension(width,height),
Format.NOT_SPECIFIED,
null,
Format.NOT_SPECIFIED)).intersects(supported);
}


/**
*SettingtheencodingqualitytothespecifiedvalueontheJPEGencoder.
*0.5isagooddefault.
*/
voidsetJPEGQuality(Playerp,floatval){

Controlcs[]
=p.getControls();
QualityControlqc
=null;
VideoFormatjpegFmt
=newVideoFormat(VideoFormat.JPEG);

//LoopthroughthecontrolstofindtheQualitycontrolfor
//theJPEGencoder.
for(inti=0;i<cs.length;i++){

if(cs[i]instanceofQualityControl&&
cs[i]instanceofOwned){
Objectowner
=((Owned)cs[i]).getOwner();

//ChecktoseeiftheownerisaCodec.
//Thencheckfortheoutputformat.
if(ownerinstanceofCodec){
Formatfmts[]
=((Codec)owner).getSupportedOutputFormats(null);
for(intj=0;j<fmts.length;j++){
if(fmts[j].matches(jpegFmt)){
qc
=(QualityControl)cs[i];
qc.setQuality(val);
System.err.println(
"-Settingqualityto"+
val
+"on"+qc);
break;
}
}
}
if(qc!=null)
break;
}
}
}


/****************************************************************
*Conveniencemethodstohandleprocessor'sstatechanges.
***************************************************************
*/

privateIntegerstateLock=newInteger(0);
privatebooleanfailed=false;

IntegergetStateLock(){
returnstateLock;
}

voidsetFailed(){
failed
=true;
}

privatesynchronizedbooleanwaitForState(Processorp,intstate){
p.addControllerListener(
newStateListener());
failed
=false;

//Calltherequiredmethodontheprocessor
if(state==Processor.Configured){
p.configure();
}
elseif(state==Processor.Realized){
p.realize();
}

//Waituntilwegetaneventthatconfirmsthe
//successofthemethod,orafailureevent.
//SeeStateListenerinnerclass
while(p.getState()<state&&!failed){
synchronized(getStateLock()){
try{
getStateLock().wait();
}
catch(InterruptedExceptionie){
returnfalse;
}
}
}

if(failed)
returnfalse;
else
returntrue;
}

/****************************************************************
*InnerClasses
***************************************************************
*/

classStateListenerimplementsControllerListener{

publicvoidcontrollerUpdate(ControllerEventce){

//Iftherewasanerrorduringconfigureor
//realize,theprocessorwillbeclosed
if(ceinstanceofControllerClosedEvent)
setFailed();

//Allcontrollerevents,sendanotification
//tothewaitingthreadinwaitForStatemethod.
if(ceinstanceofControllerEvent){
synchronized(getStateLock()){
getStateLock().notifyAll();
}
}
}
}


/****************************************************************
*SampleUsageforAVTransmit3class
***************************************************************
*/

publicstaticvoidmain(String[]args){
//Weneedthreeparameterstodothetransmission
//Forexample,
//javaAVTransmit3file:/C:/media/test.mov129.130.131.13242050

if(args.length<3){
prUsage();
}

Formatfmt
=null;
inti=0;

//Createaaudiotransmitobjectwiththespecifiedparams.
AVTransmit3at=newAVTransmit3(newMediaLocator(args[i]),
args[i
+1],args[i+2],fmt);
//Startthetransmission
Stringresult=at.start();

//resultwillbenon-nulliftherewasanerror.Thereturn
//valueisaStringdescribingthepossibleerror.Printit.
if(result!=null){
System.err.println(
"Error:"+result);
System.exit(
0);
}

System.err.println(
"Starttransmissionfor60seconds");

//Transmitfor60secondsandthenclosetheprocessor
//Thisisasafeguardwhenusingacapturedatasource
//sothatthecapturedevicewillbeproperlyreleased
//beforequitting.
//TherightthingtodowouldbetohaveaGUIwitha
//"Stop"buttonthatwouldcallstoponAVTransmit3
try{
Thread.currentThread().sleep(
60000);
}
catch(InterruptedExceptionie){
}

//Stopthetransmission
at.stop();

System.err.println(
"transmissionended.");

System.exit(
0);
}


staticvoidprUsage(){
System.err.println(
"Usage:AVTransmit3<sourceURL><destIP><destPortBase>");
System.err.println(
"<sourceURL>:inputURLorfilename");
System.err.println(
"<destIP>:multicast,broadcastorunicastIPaddressforthetransmission");
System.err.println(
"<destPortBase>:networkportnumbersforthetransmission.");
System.err.println(
"ThefirsttrackwillusethedestPortBase.");
System.err.println(
"ThenexttrackwillusedestPortBase+2andsoon./n");
System.exit(
0);
}
}

底层传输部分代码:

<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->importjava.io.IOException;
importjava.net.InetAddress;
importjava.net.DatagramSocket;
importjava.net.MulticastSocket;
importjava.net.DatagramPacket;
importjava.net.SocketException;

importjavax.media.protocol.DataSource;
importjavax.media.protocol.PushSourceStream;
importjavax.media.protocol.ContentDescriptor;
importjavax.media.protocol.SourceTransferHandler;
importjavax.media.rtp.RTPConnector;
importjavax.media.rtp.OutputDataStream;


/**
*AnimplementationofRTPConnectorbasedonUDPsockets.
*/
publicclassRTPSocketAdapterimplementsRTPConnector{

DatagramSocketdataSock;
DatagramSocketctrlSock;

InetAddressaddr;
intport;

SockInputStreamdataInStrm
=null,ctrlInStrm=null;
SockOutputStreamdataOutStrm
=null,ctrlOutStrm=null;


publicRTPSocketAdapter(InetAddressaddr,intport)throwsIOException{
this(addr,port,1);
}

publicRTPSocketAdapter(InetAddressaddr,intport,intttl)throwsIOException{

try{

if(addr.isMulticastAddress()){
dataSock
=newMulticastSocket(port);
ctrlSock
=newMulticastSocket(port+1);
((MulticastSocket)dataSock).joinGroup(addr);
((MulticastSocket)dataSock).setTimeToLive(ttl);
((MulticastSocket)ctrlSock).joinGroup(addr);
((MulticastSocket)ctrlSock).setTimeToLive(ttl);
}
else{
dataSock
=newDatagramSocket(port,InetAddress.getLocalHost());
ctrlSock
=newDatagramSocket(port+1,InetAddress.getLocalHost());
}


}
catch(SocketExceptione){
thrownewIOException(e.getMessage());
}

this.addr=addr;
this.port=port;
}

/**
*ReturnsaninputstreamtoreceivetheRTPdata.
*/
publicPushSourceStreamgetDataInputStream()throwsIOException{
if(dataInStrm==null){
dataInStrm
=newSockInputStream(dataSock,addr,port);
dataInStrm.start();
}
returndataInStrm;
}

/**
*ReturnsanoutputstreamtosendtheRTPdata.
*/
publicOutputDataStreamgetDataOutputStream()throwsIOException{
if(dataOutStrm==null)
dataOutStrm
=newSockOutputStream(dataSock,addr,port);
returndataOutStrm;
}

/**
*ReturnsaninputstreamtoreceivetheRTCPdata.
*/
publicPushSourceStreamgetControlInputStream()throwsIOException{
if(ctrlInStrm==null){
ctrlInStrm
=newSockInputStream(ctrlSock,addr,port+1);
ctrlInStrm.start();
}
returnctrlInStrm;
}

/**
*ReturnsanoutputstreamtosendtheRTCPdata.
*/
publicOutputDataStreamgetControlOutputStream()throwsIOException{
if(ctrlOutStrm==null)
ctrlOutStrm
=newSockOutputStream(ctrlSock,addr,port+1);
returnctrlOutStrm;
}

/**
*ClosealltheRTP,RTCPstreams.
*/
publicvoidclose(){
if(dataInStrm!=null)
dataInStrm.kill();
if(ctrlInStrm!=null)
ctrlInStrm.kill();
dataSock.close();
ctrlSock.close();
}

/**
*SetthereceivebuffersizeoftheRTPdatachannel.
*Thisisonlyahinttotheimplementation.Theactualimplementation
*maynotbeabletodoanythingtothis.
*/
publicvoidsetReceiveBufferSize(intsize)throwsIOException{
dataSock.setReceiveBufferSize(size);
}

/**
*GetthereceivebuffersizesetontheRTPdatachannel.
*Return-1ifthereceivebuffersizeisnotapplicablefor
*theimplementation.
*/
publicintgetReceiveBufferSize(){
try{
returndataSock.getReceiveBufferSize();
}
catch(Exceptione){
return-1;
}
}

/**
*SetthesendbuffersizeoftheRTPdatachannel.
*Thisisonlyahinttotheimplementation.Theactualimplementation
*maynotbeabletodoanythingtothis.
*/
publicvoidsetSendBufferSize(intsize)throwsIOException{
dataSock.setSendBufferSize(size);
}

/**
*GetthesendbuffersizesetontheRTPdatachannel.
*Return-1ifthesendbuffersizeisnotapplicablefor
*theimplementation.
*/
publicintgetSendBufferSize(){
try{
returndataSock.getSendBufferSize();
}
catch(Exceptione){
return-1;
}
}

/**
*ReturntheRTCPbandwidthfraction.Thisvalueisusedto
*initializetheRTPManager.CheckRTPManagerformoredetauls.
*Return-1tousethedefaultvalues.
*/
publicdoublegetRTCPBandwidthFraction(){
return-1;
}

/**
*ReturntheRTCPsenderbandwidthfraction.Thisvalueisusedto
*initializetheRTPManager.CheckRTPManagerformoredetauls.
*Return-1tousethedefaultvalues.
*/
publicdoublegetRTCPSenderBandwidthFraction(){
return-1;
}


/**
*AninnerclasstoimplementanOutputDataStreambasedonUDPsockets.
*/
classSockOutputStreamimplementsOutputDataStream{

DatagramSocketsock;
InetAddressaddr;
intport;

publicSockOutputStream(DatagramSocketsock,InetAddressaddr,intport){
this.sock=sock;
this.addr=addr;
this.port=port;
}

publicintwrite(bytedata[],intoffset,intlen){
try{
sock.send(
newDatagramPacket(data,offset,len,addr,port));
}
catch(Exceptione){
return-1;
}
returnlen;
}
}


/**
*AninnerclasstoimplementanPushSourceStreambasedonUDPsockets.
*/
classSockInputStreamextendsThreadimplementsPushSourceStream{

DatagramSocketsock;
InetAddressaddr;
intport;
booleandone
=false;
booleandataRead
=false;

SourceTransferHandlersth
=null;

publicSockInputStream(DatagramSocketsock,InetAddressaddr,intport){
this.sock=sock;
this.addr=addr;
this.port=port;
}

publicintread(bytebuffer[],intoffset,intlength){
DatagramPacketp
=newDatagramPacket(buffer,offset,length,addr,port);
try{
sock.receive(p);
}
catch(IOExceptione){
return-1;
}
synchronized(
this){
dataRead
=true;
notify();
}
returnp.getLength();
}

publicsynchronizedvoidstart(){
super.start();
if(sth!=null){
dataRead
=true;
notify();
}
}

publicsynchronizedvoidkill(){
done
=true;
notify();
}

publicintgetMinimumTransferSize(){
return2*1024;//twicetheMTUsize,justtobesafe.
}

publicsynchronizedvoidsetTransferHandler(SourceTransferHandlersth){
this.sth=sth;
dataRead
=true;
notify();
}

//Notapplicable.
publicContentDescriptorgetContentDescriptor(){
returnnull;
}

//Notapplicable.
publiclonggetContentLength(){
returnLENGTH_UNKNOWN;
}

//Notapplicable.
publicbooleanendOfStream(){
returnfalse;
}

//Notapplicable.
publicObject[]getControls(){
returnnewObject[0];
}

//Notapplicable.
publicObjectgetControl(Stringtype){
returnnull;
}

/**
*Loopandnotifythetransferhandlerofnewdata.
*/
publicvoidrun(){
while(!done){

synchronized(
this){
while(!dataRead&&!done){
try{
wait();
}
catch(InterruptedExceptione){}
}
dataRead
=false;
}

if(sth!=null&&!done){
sth.transferData(
this);
}
}
}
}
}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics